co89757
11/27/2018 - 7:58 AM

Producer-Consumer Template class

a simple abstract class for P-C model processing

namespace DEMon
{
    /// <summary>
    /// Abstract class to faciliate the construction of processors that follow producer-consumer pattern.
    /// Example: typical invokation sequence
    /// ProducerConsumer.Init()
    /// ProducerConsumer.Add(work_item)
    /// ProducerConsumer.CompleteAndWait();
    /// </summary>
    /// <typeparam name="T">type of the work item</typeparam>
    public abstract class ProducerComsumer<T>
    {
        private BlockingCollection<T> queue;
        private ManualResetEvent initFinished;
        private ManualResetEvent doneUpload;
        private long totalItems;
        public bool Done { get; private set; }
        public const int QueueCapacity = 87352;
        protected ProducerComsumer(int maxParallism = 64)
        {
            queue = new BlockingCollection<T>(QueueCapacity);
            doneUpload = new ManualResetEvent(false);
            initFinished = new ManualResetEvent(false);
            Done = false;
            Task.Run(() =>
            {
                initFinished.WaitOne();

                Parallel.ForEach(queue.GetConsumingEnumerable(), new ParallelOptions() { MaxDegreeOfParallelism = maxParallism }, Process);

                doneUpload.Set();

            });
        }

        protected abstract void Process(T item);

        public void Add(T item)
        {

            queue.Add(item);
        }

        protected abstract void Initialize();

        public void Init()
        {
            Initialize();
            initFinished.Set();
        }


        public void CompleteAndWait()
        {
            if (!queue.IsAddingCompleted)
            {
                queue.CompleteAdding();
            }

            doneUpload.WaitOne();
            Done = true;
        }

        public void CompleteAdding()
        {
            queue.CompleteAdding();
        }

        public long QueueSize()
        {
            return queue.Count;
        }

    }
}