| BlockingQueueTester.java |
package queue;
/**
* Test class to test the blocking queue.
* @author Winston Prakash
*/
public class BlockingQueueTester {
private final BlockingQueue blockingQueue = new BlockingQueue();
public void test(){
System.out.println("BlockingQueueTester finished\n");
// Create the 5 consumer threads as workers and start them.
// The workers threads will be blocked until commands
// are available in the queue.
Thread[] workers = new Thread[5];
for (int i = 0; i < workers.length; i++){
workers[i] = new Worker("Worker Thread " + i, blockingQueue);
workers[i].start();
}
// Produce the 10 runnable commands and enque them in the blocking queue.
// Each runnable commands randomly sleeps up to 5 seconds
for (int i = 0; i < 10; i++) {
final String msg = "Task " + i + " completed";
blockingQueue.enqueue(new Runnable() {
public void run() {
System.out.println(msg);
try {
long sleepSecs = (long)(Math.random() * 5000);
Thread.sleep(sleepSecs);
} catch(InterruptedException ex) {
ex.printStackTrace();
}
}
});
}
// Enque some "poison pill" commands onto the blocking queue one
// for each worker thread. The action taken by these runnbale commnads
// is to stop the current thread, thus ending all the worker thread
for (int i = 0; i < workers.length; i++) {
blockingQueue.enqueue(new Runnable() {
public void run() {
Thread.currentThread().interrupt();
}
});
}
// Join the main thread with all the Worker threads, so
// that main thread continues only after all the worker threads are
// finished.
for (int i=0; i<workers.length; i++) {
try {
workers[i].join();
} catch(InterruptedException ex) {
ex.printStackTrace();
}
}
System.out.println("\nBlockingQueueTester finished");
}
public static void main(String[] args) throws Exception{
BlockingQueueTester blockingQueueTester = new BlockingQueueTester();
blockingQueueTester.test();
}
/**
* The consumer worker thread. On starting it deques a runnable command
* from the blocking queue and runs the runnable.
*/
private class Worker extends Thread {
BlockingQueue blockingQueue;
public Worker(String name, BlockingQueue blockingQueue) {
super(name);
this.blockingQueue = blockingQueue;
}
public void run() {
try {
while(!isInterrupted()) {
((Runnable)blockingQueue.dequeue()).run();
}
} catch(InterruptedException ex) {}
System.out.println(getName() + " finished");
}
}
}