lucamilan
11/17/2015 - 7:12 PM

Synchronized Queue, blocks on Enqueue if the maximum size is reached.

Synchronized Queue, blocks on Enqueue if the maximum size is reached.

using System.Collections.Generic;
using System.Threading;

namespace Utilities
{
    /// <summary>
    /// Synchronized queue which blocks on dequeue.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public sealed class SynchronizedQueue<T>
    {
        private readonly Queue<T> _queue = new Queue<T>();
        private readonly int _maxSize;
        private bool _closing;

        public SynchronizedQueue(int maxSize)
        {
            _maxSize = maxSize;
        }

        /// <summary>
        /// Enqueue an item and wake any blocked threads.
        /// </summary>
        /// <param name="item"></param>
        public void Enqueue(T item)
        {
            lock (_queue)
            {
                while (_queue.Count >= _maxSize)
                {
                    Monitor.Wait(_queue);
                }
                _queue.Enqueue(item);
                if (_queue.Count == 1)
                {
                    // wake up any blocked dequeue
                    Monitor.PulseAll(_queue);
                }
            }
        }

        /// <summary>
        /// Dequeue an item if any, block current thread otherwise.
        /// </summary>
        /// <returns></returns>
        public T Dequeue()
        {
            lock (_queue)
            {
                while (_queue.Count == 0)
                {
                    if (_closing)
                    {
                        return default(T);
                    }

                    Monitor.Wait(_queue);
                }
                T item = _queue.Dequeue();
                if (_queue.Count == _maxSize - 1)
                {
                    // wake up any blocked enqueue
                    Monitor.PulseAll(_queue);
                }
                return item;
            }
        }

        /// <summary>
        /// Unblock waiting threads on dequeue for a clean exit.
        /// </summary>
        public void Close()
        {
            lock (_queue)
            {
                _closing = true;
                Monitor.PulseAll(_queue);
            }
        }

        /// <summary>
        /// Dequeue an item if any, block current thread otherwise.
        /// </summary>
        /// <param name="value"></param>
        /// <returns></returns>
        public bool TryDequeue(out T value)
        {
            lock (_queue)
            {
                while (_queue.Count == 0)
                {
                    if (_closing)
                    {
                        value = default(T);
                        return false;
                    }
                    Monitor.Wait(_queue);
                }
                value = _queue.Dequeue();
                if (_queue.Count == _maxSize - 1)
                {
                    // wake up any blocked enqueue
                    Monitor.PulseAll(_queue);
                }
                return true;
            }
        }
    }
}