package queue;

/**
 * Test class to test the blocking queue.
 * @author Winston Prakash
 */
public class BlockingQueueTester {
    private final BlockingQueue blockingQueue = new BlockingQueue();
    
    public void test(){
        System.out.println("BlockingQueueTester finished\n");
        // Create the 5 consumer threads as workers and start them.
        // The workers threads will be blocked until commands
        // are available in the queue.
        Thread[] workers = new Thread[5];
        for (int i = 0; i < workers.length; i++){
            workers[i] = new Worker("Worker Thread " + i, blockingQueue);
            workers[i].start();
        }
        
        // Produce the 10 runnable commands and enque them in the blocking queue.
        // Each runnable commands randomly sleeps up to 5 seconds
        for (int i = 0; i < 10; i++) {
            final String msg = "Task " + i + " completed";
            blockingQueue.enqueue(new Runnable() {
                public void run() {
                    System.out.println(msg);
                    try {
                        long sleepSecs = (long)(Math.random() * 5000);
                        Thread.sleep(sleepSecs);
                    } catch(InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }
            });
        }
        
        // Enque some "poison pill" commands onto the blocking queue one 
        // for each worker thread. The action taken by these runnbale commnads
        // is to stop the current thread, thus ending all the worker thread
        for (int i = 0; i < workers.length; i++) {
            blockingQueue.enqueue(new Runnable() {
                public void run() {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // Join the main thread with all the Worker threads, so
        // that main thread continues only after all the worker threads are
        // finished.
        for (int i=0; i<workers.length; i++) {
            try {
                workers[i].join();
            } catch(InterruptedException ex) {
                ex.printStackTrace();
            }
        }
        System.out.println("\nBlockingQueueTester finished");
    }
    
    public static void main(String[] args) throws Exception{
        BlockingQueueTester blockingQueueTester = new BlockingQueueTester();
        blockingQueueTester.test();
    }
    
    /**
     * The consumer worker thread. On starting it deques a runnable command
     * from the blocking queue and runs the runnable.  
     */
    private class Worker extends Thread {
        BlockingQueue blockingQueue;
        public Worker(String name, BlockingQueue blockingQueue) { 
            super(name); 
            this.blockingQueue = blockingQueue;
        }
        
        public void run() {
            try {
                while(!isInterrupted()) {
                    ((Runnable)blockingQueue.dequeue()).run();
                }
            } catch(InterruptedException ex) {}
            System.out.println(getName() + " finished");
        }
    }
}