e-semyonov
11/27/2013 - 6:11 PM

BlockingQueue.java

public class BlockingQueue implements Queue {
 
    private java.util.Queue queue = new java.util.LinkedList();
 
    /**
     * Make a blocking Dequeue call so that we'll only return when the queue has
     * something on it, otherwise we'll wait until something is put on it.
     * 
     * @returns  This will return null if the thread wait() call is interrupted.
     */
    public synchronized Object dequeue() {
        Object msg = null;
        while (queue.isEmpty()) {
            try {
                wait();
            } catch (InterruptedException e) {
                // Error return the client a null item
                return msg;
            }
        }
        msg = queue.remove();
        return msg;
    }
 
    /**
     * Enqueue will add an object to this queue, and will notify any waiting
     * threads that there is an object available.
     */
    public synchronized void enqueue(Object o) {
        queue.add(o);
        // Wake up anyone waiting for something to be put on the queue.
        notifyAll();
    }
 
}
public class Consumer implements Runnable {
    // This will be assigned in the constructor
    private Queue queue = null; 
 
    public void process(Object msg) {
        try {
            //process message non-trivially (IE: it takes awhile).
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
 
 
    public void run() {
        while(true) { 
            doStuff();
        }
    }
 
 
    public void doStuff() {
        Object msg = queue.dequeue();
        process(msg);
    }
 
}
public class Producer implements Runnable {
    // This will be assigned in the constructor
    private Queue queue = null;
 
    public void run() {
        // Binds to socket, reads messages in
        // packages message calls doSomething()
        // doSomething(Object msg);
    }
 
    public void doSomething(Object msg) {
        queue.enqueue(msg);
    }
}