摘自《Java编程思想》
package com.test.concurrency; import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit; class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>
{
private Random rand = new Random(47);
private static int counter = 0;
private final int id = counter++;
private final int priority;// 用于表示当前任务的执行优先级 protected static List<PrioritizedTask> sequence = new ArrayList<PrioritizedTask>(); public PrioritizedTask(int priority)
{
this.priority = priority;
sequence.add(this);
}
/**
* 用于比较任务的优先级
*/
@Override
public int compareTo(PrioritizedTask o)
{
return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0);
} @Override
public void run()
{
try
{
TimeUnit.MILLISECONDS.sleep(rand.nextInt(250));
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println(this);
} @Override
public String toString()
{
return String.format("[%1$-3d]", priority) + " Task " + id;
} public String summary()
{
return "(" + id + ":" + priority + ")";
} } /**
* 该任务负责展示当前sequence中的任务与正常执行的任务进行对比以及停止线程
*/
class EndSentinel extends PrioritizedTask
{
private ExecutorService exec; public EndSentinel(ExecutorService e)
{
super(-1);
exec = e;
} public void run()
{
int count = 0;
for (PrioritizedTask pt : sequence)
{
System.out.println(pt.summary());
if (++count % 5 == 0)
{
System.out.println();
}
}
System.out.println();
System.out.println(this + " Calling shutdownNow");
exec.shutdownNow();
}
} /**
*负责生成任务,分别造不同级别的任务添加到执行队列中
*
*/
class PrioritizedTaskProduncer implements Runnable
{
private Random rand = new Random(47);
private Queue<Runnable> queue;
private ExecutorService exec; public PrioritizedTaskProduncer(Queue<Runnable> q, ExecutorService e)
{
queue = q;
exec = e;
} @Override
public void run()
{
//20个不同优先级的任务,通过随机数来产生
for (int i = 0; i < 20; i++)
{
queue.add(new PrioritizedTask(rand.nextInt(10)));
Thread.yield();
}
try
{
//20个优先级高的任务
for (int i = 0; i < 10; i++)
{ TimeUnit.MILLISECONDS.sleep(250);
queue.add(new PrioritizedTask(10));
}
//优先级0-9的执行任务
for (int i = 0; i < 10; i++)
{
queue.add(new PrioritizedTask(i));
} queue.add(new EndSentinel(exec));
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Finished PrioritizedTaskProducer");
}
}
/**
*负责消费任务
*/
class PrioritizedTaskConsumer implements Runnable
{
private PriorityBlockingQueue<Runnable> q; public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> q)
{
this.q = q;
} @Override
public void run()
{
try
{
while (!Thread.interrupted())
{
q.take().run();
}
}
catch (InterruptedException e)
{
e.printStackTrace();
}
System.out.println("Finished PrioritizedTaskConsumer");
}
} public class PriorityBlockingQueueDemo
{
public static void main(String[] args) throws InterruptedException
{
ExecutorService exec = Executors.newCachedThreadPool();
PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
exec.execute(new PrioritizedTaskProduncer(queue, exec));
//此处需特别注意,需要等待所有的任务已经生成完后,再执行消费,否则,可能会看到消费并非按照优先级执行的
Thread.sleep(5000);
exec.execute(new PrioritizedTaskConsumer(queue));
}
}