harin
3/22/2013 - 3:54 PM

hw7finally.java

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

public class AlienCommDivideAndConcur {
  private static int NUM_THREADS = 4;
	public static class SignalReader extends Thread{
		private BlockingQueue<ArrayList<AlienLib.Garble>> toPutIn;
		private final CountDownLatch latch;
		
		public SignalReader(BlockingQueue<ArrayList<AlienLib.Garble>> a , CountDownLatch latch){
			toPutIn = a;
			this.latch = latch;
		}
		
		public void run(){
			try{
				while(AlienLib.isAlienTransmitting()){
					 ArrayList<AlienLib.Garble> signals = AlienLib.getAlienSignals();
					 toPutIn.add(signals);
				 }
				System.out.println("read all done");
				latch.countDown();
			} catch (Exception e){
				System.out.println("Reader err"+e);
			}
			
			 
		}
	}
	public static class SignalProcessor extends Thread{
		private BlockingQueue<ArrayList<AlienLib.Garble>> toProcess;
		private BlockingQueue<ArrayList<AlienLib.Data>> toPutIn;
		private ExecutorService executor ;
		private int pass;
		private final CountDownLatch latch;

		
		public SignalProcessor(BlockingQueue<ArrayList<AlienLib.Garble>> in,BlockingQueue<ArrayList<AlienLib.Data>> out, CountDownLatch latch){
			toProcess = in;
			toPutIn = out;
			executor = Executors.newFixedThreadPool(NUM_THREADS);
			pass =0;
			this.latch = latch;
		}
		public void run(){
			try{
				while(pass<7){
					//Get things from queue
					ArrayList<AlienLib.Garble> signals = toProcess.take();
					//do some work
					int dataSize = signals.size();
		            int partitionSize = dataSize/ NUM_THREADS;
		            
		            int from =0;
		            int to =0;
		            List<SignalTaskExecutor> workers = new LinkedList<SignalTaskExecutor>();
		            for(int j = 0; j < NUM_THREADS-1; j++)
		            {
		                from = j * partitionSize;
		                to   = from + partitionSize;

		                workers.add( new SignalTaskExecutor(signals, from, to) );
		            }
		            workers.add(new SignalTaskExecutor(signals, to, signals.size()));
		            
		            List<Future<ArrayList<AlienLib.Data>>> results = executor.invokeAll(workers);
		            
		            ArrayList<AlienLib.Data> data = new ArrayList<AlienLib.Data>();
	                for(Future<ArrayList<AlienLib.Data>> result: results){
	                	data.addAll(result.get());
	                }
	                //put result back into another queue
	                toPutIn.add(data);
	                pass++;
	        
				}
				System.out.println("process all done");
				latch.countDown();
			} catch (Exception e){
				System.out.println("Signal err:"+e.getMessage());
			}
		}
	}
	
	public static class PMTransmitter extends Thread{
		private BlockingQueue<ArrayList<AlienLib.Data>> toProcess;
		private int i;
		private int pass;
		private final CountDownLatch latch;

		
		public PMTransmitter(BlockingQueue<ArrayList<AlienLib.Data>> in, CountDownLatch latch){
			toProcess = in;
			i=0;
			pass =0;
			this.latch= latch;
			
		}
		
		public void run(){
			try{
				while(pass<7){
					ArrayList<AlienLib.Data> data = toProcess.take();
					AlienLib.transmitToPrimeMinisterOffice(data);
		            System.out.println("Pass: "+ i++ +" with "+ data.size()+" signals.");
		            pass++;
				}
				System.out.println("transmit all done");
				latch.countDown();
			} catch (Exception e){
				System.out.print("PM err:" +e);
			}
			
		}
	}
	
    public static void main(String[] args)
    {
    	
    	ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
    	
    	final long start = System.nanoTime();
        try {
            System.out.println("Initiaing communication system...");
            
            BlockingQueue<ArrayList<AlienLib.Garble>> garble = new LinkedBlockingQueue<ArrayList<AlienLib.Garble>>(100);
    		BlockingQueue<ArrayList<AlienLib.Data>> data = new LinkedBlockingQueue<ArrayList<AlienLib.Data>>(100);
    		CountDownLatch l = new CountDownLatch(3);
    		
    		Thread stage1 = new SignalReader(garble,l);
    		Thread stage2 = new SignalProcessor(garble, data,l);
    		Thread stage3 = new PMTransmitter(data,l);
    		
    		stage1.start();
    		stage2.start();
    		stage3.start();
    		
    		l.await();

            System.out.println("Done processing alien's data.");
            
            final long end = System.nanoTime();
        	System.out.println("Time taken: "+(end-start)/1.0e6);
        } catch(Exception e) {
            System.out.println(e.getMessage());
        }
    }
    
    //change Garble to Data
    //return ArrayList of AlienLib.Data
    public static class SignalTaskExecutor implements Callable<ArrayList<AlienLib.Data>> {
        private ArrayList<AlienLib.Garble> data;
        private int from;
        private int to;

        SignalTaskExecutor(ArrayList<AlienLib.Garble> data, int from, int to) {
            this.data = data;
            this.from = from;
            this.to = to;
        }

        @Override
        public ArrayList<AlienLib.Data> call() throws Exception{
            ArrayList<AlienLib.Data> result = new ArrayList<AlienLib.Data>();

            for (int i = from; i < to; i++) {
                result.add(AlienLib.processAlienSignal(data.get(i)));
            }

            return result;
        }
    }
}