lucamilan
4/30/2016 - 8:32 AM

Producer/Consumer implementation with .NET 4 BlockingCollection

Producer/Consumer implementation with .NET 4 BlockingCollection

using System;
using System.Threading;
using System.Threading.Tasks;

namespace ProducerConsumer
{
    class QueueWorker : IDisposable
    {
        private readonly CancellationTokenSource _cancelTokenSource = new CancellationTokenSource();
        private readonly Task _messageHandlerTask;

        public QueueWorker(Action<object> messageHandler)
        {
            Id = Guid.NewGuid().ToString();
            _messageHandlerTask = Task.Factory.StartNew(messageHandler, _cancelTokenSource.Token, TaskCreationOptions.LongRunning);
        }

        public string Id
        {
            get;
            private set;
        }

        public void Stop()
        {
            _cancelTokenSource.Cancel();
            _messageHandlerTask.Wait();
        }


        public void Dispose()
        {
            _cancelTokenSource.Dispose();
            _messageHandlerTask.Dispose();
        }
    }
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;

namespace ProducerConsumer
{
    /// <summary>
    /// Queue Service
    /// </summary>
    public  class QueueService<TMessage> : IDisposable where TMessage : class
    {
        private readonly BlockingCollection<TMessage> _messages = new BlockingCollection<TMessage>();
        private readonly List<QueueWorker> _workers = new List<QueueWorker>();
        private readonly Action<TMessage> _messageHandler;

        /// <summary>
        /// Initializes a new instance of the <see cref="QueueService&lt;TMessage&gt;"/> class.
        /// </summary>
        /// <param name="messageHandler">The message handler.</param>
        public QueueService(Action<TMessage> messageHandler)
        {
            _messageHandler = messageHandler;
        }

        /// <summary>
        /// Queues the specified message.
        /// </summary>
        /// <param name="message">The message.</param>
        public void Queue(TMessage message)
        {
            _messages.Add(message);
        }

        /// <summary>
        /// How many messages are left in the queue to be sent
        /// </summary>
        public int QueueLength
        {
            get { return _messages.Count; }
        }

        /// <summary>
        /// Gets a value indicating whether this instance is running.
        /// </summary>
        /// <value>
        /// 	<c>true</c> if this instance is running; otherwise, <c>false</c>.
        /// </value>
        public bool IsRunning { get; private set; }

        /// <summary>
        /// How many worksers are currently running
        /// </summary>
        public int NumberOfWorkers
        {
            get { return _workers.Count; }
        }

        /// <summary>
        /// Starts the specified number of workers ready to send queued messages
        /// </summary>
        /// <param name="numberOfWorkers">Number of Workers</param>
        public void Start(int numberOfWorkers = 1)
        {
            IsRunning = true;

            SetNumberOfWorkers(numberOfWorkers);
        }

        /// <summary>
        /// Stops all workers and the service, without waiting for queued messages to be sent.
        /// </summary>
        public void Stop()
        {
            IsRunning = false;

            SetNumberOfWorkers(0);
        }

        /// <summary>
        /// Increase or decrease the number of workers to process queued messages.
        /// </summary>
        /// <param name="value">New Value for the number of workers to use</param>
        public void SetNumberOfWorkers(int value)
        {
            lock (_workers)
            {
                while (_workers.Count > value)
                {
                    _workers[0].Stop();
                    _workers.RemoveAt(0);
                }

                while (_workers.Count < value)
                {
                    _workers.Add(new QueueWorker(MessageHander));
                }
            }
        }

        private void MessageHander(object state)
        {
            var cancelToken = (CancellationToken)state;

            while (!cancelToken.IsCancellationRequested)
            {
                var message = default(TMessage);

                if (!cancelToken.IsCancellationRequested && (message = _messages.Take()) != default(TMessage))
                {
                    _messageHandler(message);
                }
            }
        }

        ~QueueService()
        {
            Dispose(false);
        }

        protected virtual void Dispose(bool disposing)
        {
            _messages.Dispose();
            _workers.ForEach(w => w.Dispose());
        }

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
    }
}