Java并发编程(九)《Executor框架》

时间:2022-04-03 15:47:50

Java并发编程(九)《Executor框架》

@(并发)

9.1 Executor框架简介

9.1.1 Executor框架的两级调度模型

9.1.1.1 两级调度模型图

Java并发编程(九)《Executor框架》

9.1.1.2 解释

  • 在上层,java多线程程序将应用分解成若干的任务提·交给用户调度器(Executor框架),然后Executor框架将这些任务映射到固定数量的线程处理。
  • 在下层,os将这些固定数量的线程映射到cpu上。

9.1.2 Executor框架的成员

9.1.2.1 任务接口(Runnable或者Callable)

Runnable或者Callable接口的实现类都可以交给ThreadPoolExecutor和ScheduledThreadPoolExecuotr线程池处理:

void execute(Runnable command);
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

9.1.2.2 任务的执行(线程池)

提交的任务由线程池执行,线程池分为ThreadPoolExecutor(立即执行任务的线程池)和ScheduledThreadPoolExecuotr(延时或者定时执行任务的线程池), 线程池的行为由Executor以及继承Execuotr的ExecutorService接口定义.

9.1.2.3 异步计算的结果返回(Future和FutureTask)

当submit提交任务给线程池时,会立即返回Future的对象,如果主线程想要获取任务异步计算完成的结果,可以Future.get()当任务没有执行或者执行中时,主线程会阻塞在Future.get()方法上,当任务异步执行完毕后主线程会被唤醒立即返回。

9.1.3 Executor框架的类与接口UML

![Alt text](./Executor框架类UML 1.png)

9.1.4 Executor框架的的使用

![Alt text](./Executor框架使用示意图 1.png)

要结果返回的向线程池提交任务的过程:

1.将Runnable 任务包装成FutureTask类型的对象,提交给线程池执行,然后返回FutureTask对象

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
     return new FutureTask<T>(callable);
 }

2.通过调用Future.get()方法等待任务异步执行完毕,主线程也可以通过Future.cancel取消任务。

Future<?> future = pool.submit(new Runnable() {
    public void run() {
        System.out.println("poll submit");
    }
});
//当提交的任务还没有执行或者执行中时,主线程会在get方法上阻塞等待
if(future.get()!=null){
     System.out.println("end");
 }

9.2 核心线程池ThreadPoolExcutor

9.2.1 固定线程数的FixedThreadPool

Executors.newFixedThreadPool(10);

public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

9.2.3 单个线程执行的SingleThreadPool

Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

9.2.2 适合执行短期异步任务的CachedThreadPool

Executors.newCachedThreadPool();

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

9.3 定时调度线程池ScheduledThreadPoolExecutor

9.3.1 简介

  • ScheduledThreadPoolExecutor继承ThreadPoolExecutor,是对一般线程池的扩展,实现了延时运行任务或者定时执行某个任务的功能。
  • 功能和Timer相似,但它更能更加灵活强大。Timer只是单个线程执行,而ScheduledThreadPoolExecutor可以是多个线程并发执行。
  • 它使用DelayQueue*延时队列作为任务队列。

9.3.2 延时和周期性任务

  ScheduledThreadPoolExecutor通过schedule()、scheduleAtFixedRate(),scheduleWithFixedDelay()提交一次性延时和周期性任务:

public class ScheduledThreadPoolTest {

    public static void main(String[] args) {
        //有10个核心线程的定时调度线程池
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);

        /** * 一次性调度任务:延时1s执行任务 */
        scheduledExecutorService.schedule(new ScheduledTask(), 1, TimeUnit.SECONDS);
        /** * 周期性调度任务: * 初始化延时1s执行任务, * 周期为1s:上一次任务执行开始时间到下一次任务开始执行间隔1s(下一个任务不要等前一个任务执行完) */
        scheduledExecutorService.scheduleAtFixedRate(new ScheduledTask(), 1, 1, TimeUnit.SECONDS);

        /** * 周期性调度任务: * 初始化延时1s执行任务, * 周期为1s:上一次任务执行结束时间到下一次任务开始执行间隔1s(下一个任务要等前一个任务执行完) */
        scheduledExecutorService.scheduleWithFixedDelay(new ScheduledTask(), 1, 1, TimeUnit.SECONDS);
    }

    static class ScheduledTask implements Runnable {
        public void run() {
            System.out.println("scheduled task");
        }
    }
}

9.3.3 运行机制

Java并发编程(九)《Executor框架》

  1. 通过调用scheduleAtFixedRate()或者scheduleWithFixedDelay()方法,提交一个ScheduledFutureTask任务给线程池:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                             long initialDelay,
                                             long period,
                                             TimeUnit unit) {
   if (command == null || unit == null)
       throw new NullPointerException();
   if (period <= 0)
       throw new IllegalArgumentException();
   //组装ScheduledFutureTask任务对象
   RunnableScheduledFuture<?> t = decorateTask(command,
       new ScheduledFutureTask<Object>(command,
                                       null,
                                       triggerTime(initialDelay, unit),
                                       unit.toNanos(period)));
   //将ScheduledFutureTask任务对象交给线程池执行
   delayedExecute(t);
   return t;
}

ScheduledFutureTask是ScheduledThreadPool的内部类,是一个具有延时异步特性的任务类,任务接口实现了Delay接口,需要重写getDelay()和compareTo()方法 :

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

     /** * 每个任务延时的时间,当时间没有到达到time时,此任务是不会被取出的 * 同时将任务添加到Delay队列中时,队列会根据time排序,最早的time时间排在最前面,优先取出 */
    private long time;

    /** Sequence number to break ties FIFO */
    /** * 每个任务被创建时唯一的序列号,由线程池上静态变量AtomicLong sequencer线程安全的有序创建。 * 保证任务创建时的有序性FIFO,同时当阻塞队列中两个任务延时时间time一样时,可以根据谁先创建的sequenceNumber序列号排序,小的先被创建先执行 */
    private final long sequenceNumber;
    /** * 任务重复执行的周期性时间 * 正值:fixed-rate execution代表从上个任务开始时间间隔period,周期性重复执行 * 负值:ixed-delay execution代表从上个任务结束时间间隔-period,周期性重复执行 * 0:一次性不重复执行 */
    private final long period;

    /** * 获取任务的延时到期的剩余时间,单位纳秒: * 当getDelay()==0时,代表任务到期了,需要立即执行 */
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

    /** * 任务在延时队列中比较排序的依据: * 延时时间小的排在队列前面,优先被取出 * 当延时时间相同时,看任务的唯一自增序列,先创建的排在前面 */
    public int compareTo(Delayed other) {
        if (other == this) // compare zero ONLY if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) -
                  other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0)? 0 : ((d < 0)? -1 : 1);
    }
}
  1. 当线程池此时的线程数<核心线程数时,创建Worker线程工作;否则直接添加到延时任务队列DelayQueue中,DelayQueue由优先级队列PriorityQueue组成,所以会把提交的任务根据延时时间排序,延时先创建的任务排在最前面:
private void delayedExecute(Runnable command) {
    if (isShutdown()) {
        reject(command);
        return;
    }
    // 当线程池此时的线程数<核心线程数时,创建Worker线程工作
    if (getPoolSize() < getCorePoolSize())
        prestartCoreThread();
    //否则直接添加到延时任务队列DelayQueue中
    super.getQueue().add(command);
}
  1. 线程池里的核心线程并发循环take()延时队列,获取最先延时到期的任务(队头),若队头任务还没到期,则Worker线程take()阻塞等待。到期后,Worker线程运行ScheduledFutureTask.run():
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {

    // ScheduledFutureTask实现了Runable接口,重写了run方法
    public void run() {
        if (isPeriodic())
            runPeriodic();
        else
            ScheduledFutureTask.super.run();
    }
}
  1. Worker线程执行完当前任务后,若执行的是周期性的任务,则计算下次任务开始时间time,重新加入到任务队列中:
//判断是否为周期性任务,若是执行runPeriodic
public boolean isPeriodic() {
           return period != 0;
}
//周期性执行任务实现
private void runPeriodic() {
   //1.先执行当前任务
   boolean ok = ScheduledFutureTask.super.runAndReset();
   boolean down = isShutdown();
   // Reschedule if not cancelled and not shutdown or policy allows
   //2.执行完当前任务后,若任务没被取消,线程池没有关闭,则计算下一次重复任务的运行时间。
   if (ok && (!down ||
              (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
               !isStopped()))) {
       long p = period;
       if (p > 0)
           //当调用scheduleAtFixedRate提交任务时,表示下次任务执行的时间=这次任务的开始时间+period
           time += p;
       else
           //当调用scheduleWithFixedDelay提交任务时,表示下次任务执行的时间=这次任务的结束时间now()+period
           time = triggerTime(-p);

        //3.修改了当前任务下一次执行的时间,重新提交给任务队列中,以便周期性执行
       ScheduledThreadPoolExecutor.super.getQueue().add(this);
   }
   else if (down)
       interruptIdleWorkers();
}

9.4 异步计算结果FutureTask