senpost
4/25/2011 - 4:07 AM

Producer and Consumer Queue using multiple threads

Producer and Consumer Queue using multiple threads

class ProducerConsumerQueue<T> : IDisposable
{
    Queue<T> queue = new Queue<T>();
    Thread consumerThread;
    //AutoResetEvent allows threads to communicate with each other by signaling
    AutoResetEvent waitHandle = new AutoResetEvent(false);

    readonly object lockObject = new object();
    bool shouldStop = false; //Stop flag

    public ProducerConsumerQueue()
    {
        consumerThread = new Thread(Consume);
        consumerThread.Start();
    }

    public void Enqueue(T item)
    {
        lock (lockObject) queue.Enqueue(item);
        waitHandle.Set();
    }

    void Consume()
    {
        while (!shouldStop)
        {
            T item = default(T);
            lock (lockObject)
            {
                if (queue.Any())
                    item = queue.Dequeue();
            }

            if (item != null)
            {
                //Do work
                Console.WriteLine("Processed item " + item);
                Thread.Sleep(1000);
            }
            else
            {
                waitHandle.WaitOne(); //No items, So wait for signal
                //Signal received, i.e items added to the queue
            }
        }
    }

    public void Dispose()
    {
        Console.WriteLine("Disposing");
        lock (lockObject) shouldStop = true;
        consumerThread.Join();
        waitHandle.Close();
    }
}

//Using ProducerConsumerQueue
ProducerConsumerQueue<string> queue = new ProducerConsumerQueue<string>();
queue.Enqueue("String1");
queue.Enqueue("String2");
queue.Enqueue("String3");

Console.ReadLine();
queue.Dispose();