Synchronized Queue, blocks on Enqueue if the maximum size is reached.
using System.Collections.Generic;
using System.Threading;
namespace Utilities
{
/// <summary>
/// Synchronized queue which blocks on dequeue.
/// </summary>
/// <typeparam name="T"></typeparam>
public sealed class SynchronizedQueue<T>
{
private readonly Queue<T> _queue = new Queue<T>();
private readonly int _maxSize;
private bool _closing;
public SynchronizedQueue(int maxSize)
{
_maxSize = maxSize;
}
/// <summary>
/// Enqueue an item and wake any blocked threads.
/// </summary>
/// <param name="item"></param>
public void Enqueue(T item)
{
lock (_queue)
{
while (_queue.Count >= _maxSize)
{
Monitor.Wait(_queue);
}
_queue.Enqueue(item);
if (_queue.Count == 1)
{
// wake up any blocked dequeue
Monitor.PulseAll(_queue);
}
}
}
/// <summary>
/// Dequeue an item if any, block current thread otherwise.
/// </summary>
/// <returns></returns>
public T Dequeue()
{
lock (_queue)
{
while (_queue.Count == 0)
{
if (_closing)
{
return default(T);
}
Monitor.Wait(_queue);
}
T item = _queue.Dequeue();
if (_queue.Count == _maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(_queue);
}
return item;
}
}
/// <summary>
/// Unblock waiting threads on dequeue for a clean exit.
/// </summary>
public void Close()
{
lock (_queue)
{
_closing = true;
Monitor.PulseAll(_queue);
}
}
/// <summary>
/// Dequeue an item if any, block current thread otherwise.
/// </summary>
/// <param name="value"></param>
/// <returns></returns>
public bool TryDequeue(out T value)
{
lock (_queue)
{
while (_queue.Count == 0)
{
if (_closing)
{
value = default(T);
return false;
}
Monitor.Wait(_queue);
}
value = _queue.Dequeue();
if (_queue.Count == _maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(_queue);
}
return true;
}
}
}
}