coderplay
4/8/2013 - 8:18 AM

Using PriorityBlockingQueue in thread pool

Using PriorityBlockingQueue in thread pool

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class PBQvsThreadPool {

  static class ComparableFutureTask<V> extends FutureTask<V> implements
      Runnable, Comparable<ComparableFutureTask<V>> {

    private int priority;

    public ComparableFutureTask(Callable<V> callable, int priority) {
      super(callable);
      this.priority = priority;
    }

    public ComparableFutureTask(Runnable runnable, V result, int priority) {
      super(runnable, result);
      this.priority = priority;
    }

    @Override
    public int compareTo(ComparableFutureTask<V> o) {
      return this.priority - o.priority;
    }

  }
  
  static class PBQThreadPoolExecutor extends ThreadPoolExecutor {
    public PBQThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        PriorityBlockingQueue<Runnable> workQueue) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    
    public PBQThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }
    
    public PBQThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        PriorityBlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler) {
      super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }
    
    public <T> Future<T> submit(Runnable task, T result, int priority) {
      if (task == null)
        throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task, result, priority);
      execute(ftask);
      return ftask;
    }

    public <T> Future<T> submit(Callable<T> task, int priority) {
      if (task == null)
        throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task, priority);
      execute(ftask);
      return ftask;
    }
    
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value,
        int priority) {
      return new ComparableFutureTask<T>(runnable, value, priority);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable,
        int priority) {
      return new ComparableFutureTask<T>(callable, priority);
    }
  }

  public static void main() {
    PriorityBlockingQueue<Runnable> pbq =
        new PriorityBlockingQueue<Runnable>(1024);

    PBQThreadPoolExecutor pool =
        new PBQThreadPoolExecutor(1024, 2048, 1000L, TimeUnit.MILLISECONDS, pbq);
    
    final int priority = 1;

    pool.submit(new Callable<Long>() {
      @Override
      public Long call() throws Exception {
        return 1L;
      }
    }, priority);
  }
}