import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
public class AlienCommDivideAndConcur {
private static int NUM_THREADS = 4;
public static class SignalReader extends Thread{
private BlockingQueue<ArrayList<AlienLib.Garble>> toPutIn;
private final CountDownLatch latch;
public SignalReader(BlockingQueue<ArrayList<AlienLib.Garble>> a , CountDownLatch latch){
toPutIn = a;
this.latch = latch;
}
public void run(){
try{
while(AlienLib.isAlienTransmitting()){
ArrayList<AlienLib.Garble> signals = AlienLib.getAlienSignals();
toPutIn.add(signals);
}
System.out.println("read all done");
latch.countDown();
} catch (Exception e){
System.out.println("Reader err"+e);
}
}
}
public static class SignalProcessor extends Thread{
private BlockingQueue<ArrayList<AlienLib.Garble>> toProcess;
private BlockingQueue<ArrayList<AlienLib.Data>> toPutIn;
private ExecutorService executor ;
private int pass;
private final CountDownLatch latch;
public SignalProcessor(BlockingQueue<ArrayList<AlienLib.Garble>> in,BlockingQueue<ArrayList<AlienLib.Data>> out, CountDownLatch latch){
toProcess = in;
toPutIn = out;
executor = Executors.newFixedThreadPool(NUM_THREADS);
pass =0;
this.latch = latch;
}
public void run(){
try{
while(pass<7){
//Get things from queue
ArrayList<AlienLib.Garble> signals = toProcess.take();
//do some work
int dataSize = signals.size();
int partitionSize = dataSize/ NUM_THREADS;
int from =0;
int to =0;
List<SignalTaskExecutor> workers = new LinkedList<SignalTaskExecutor>();
for(int j = 0; j < NUM_THREADS-1; j++)
{
from = j * partitionSize;
to = from + partitionSize;
workers.add( new SignalTaskExecutor(signals, from, to) );
}
workers.add(new SignalTaskExecutor(signals, to, signals.size()));
List<Future<ArrayList<AlienLib.Data>>> results = executor.invokeAll(workers);
ArrayList<AlienLib.Data> data = new ArrayList<AlienLib.Data>();
for(Future<ArrayList<AlienLib.Data>> result: results){
data.addAll(result.get());
}
//put result back into another queue
toPutIn.add(data);
pass++;
}
System.out.println("process all done");
latch.countDown();
} catch (Exception e){
System.out.println("Signal err:"+e.getMessage());
}
}
}
public static class PMTransmitter extends Thread{
private BlockingQueue<ArrayList<AlienLib.Data>> toProcess;
private int i;
private int pass;
private final CountDownLatch latch;
public PMTransmitter(BlockingQueue<ArrayList<AlienLib.Data>> in, CountDownLatch latch){
toProcess = in;
i=0;
pass =0;
this.latch= latch;
}
public void run(){
try{
while(pass<7){
ArrayList<AlienLib.Data> data = toProcess.take();
AlienLib.transmitToPrimeMinisterOffice(data);
System.out.println("Pass: "+ i++ +" with "+ data.size()+" signals.");
pass++;
}
System.out.println("transmit all done");
latch.countDown();
} catch (Exception e){
System.out.print("PM err:" +e);
}
}
}
public static void main(String[] args)
{
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
final long start = System.nanoTime();
try {
System.out.println("Initiaing communication system...");
BlockingQueue<ArrayList<AlienLib.Garble>> garble = new LinkedBlockingQueue<ArrayList<AlienLib.Garble>>(100);
BlockingQueue<ArrayList<AlienLib.Data>> data = new LinkedBlockingQueue<ArrayList<AlienLib.Data>>(100);
CountDownLatch l = new CountDownLatch(3);
Thread stage1 = new SignalReader(garble,l);
Thread stage2 = new SignalProcessor(garble, data,l);
Thread stage3 = new PMTransmitter(data,l);
stage1.start();
stage2.start();
stage3.start();
l.await();
System.out.println("Done processing alien's data.");
final long end = System.nanoTime();
System.out.println("Time taken: "+(end-start)/1.0e6);
} catch(Exception e) {
System.out.println(e.getMessage());
}
}
//change Garble to Data
//return ArrayList of AlienLib.Data
public static class SignalTaskExecutor implements Callable<ArrayList<AlienLib.Data>> {
private ArrayList<AlienLib.Garble> data;
private int from;
private int to;
SignalTaskExecutor(ArrayList<AlienLib.Garble> data, int from, int to) {
this.data = data;
this.from = from;
this.to = to;
}
@Override
public ArrayList<AlienLib.Data> call() throws Exception{
ArrayList<AlienLib.Data> result = new ArrayList<AlienLib.Data>();
for (int i = from; i < to; i++) {
result.add(AlienLib.processAlienSignal(data.get(i)));
}
return result;
}
}
}