Spring 定时器schedule实现

时间:2024-01-21 07:21:58

注解方式:
核心类摘要:
1.ScheduledAnnotationBeanPostProcessor
2.ScheduledTaskRegistrar
3.TaskScheduler
4.ReschedulingRunnable
具体说明:
1.ScheduledAnnotationBeanPostProcessor

(1)核心方法:Object postProcessAfterInitialization(final Object bean, String beanName)
功能:负责@Schedule注解的扫描,构建ScheduleTask

(2)核心方法:onApplicationEvent(ContextRefreshedEvent event)
功能:spring容器加载完毕之后调用,ScheduleTask向ScheduledTaskRegistrar中注册, 调用ScheduledTaskRegistrar.afterPropertiesSet()

2.ScheduledTaskRegistrar

(1)核心方法:void afterPropertiesSet()
功能:初始化所有定时器,启动定时器

3.TaskScheduler
主要的实现类有三个 ThreadPoolTaskScheduler, ConcurrentTaskScheduler,TimerManagerTaskScheduler
作用:这些类的作用主要是将task和executor用ReschedulingRunnable包装起来进行生命周期管理。

(1)核心方法:ScheduledFuture schedule(Runnable task, Trigger trigger)

4.ReschedulingRunnable
(1)核心方法:public ScheduledFuture schedule()
(2)核心方法:public void run()

public ScheduledFuture schedule() {
        synchronized (this.triggerContextMonitor) {
            this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
            if (this.scheduledExecutionTime == null) {
                return null;
            }
            long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
            this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
            return this;
        }
    }

    @Override
    public void run() {
        Date actualExecutionTime = new Date();
        super.run();
        Date completionTime = new Date();
        synchronized (this.triggerContextMonitor) {
            this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
        }
        if (!this.currentFuture.isCancelled()) {
            schedule();
        }
    }

通过schedule方法及run方法互相调用,再利用ScheduledExecutorService接口的schedule(Runnable command,long delay,TimeUnit unit)单次执行效果,从而实现一定时间重复触发的效果。

配置文件的方式基本相似只是通过ScheduledTasksBeanDefinitionParser类读取节点组装对应定时任务bean
Spring定时器的实现

ScheduledThreadPoolExecutor类

 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null, triggerTime(delay, unit)));
        delayedExecute(t);//延迟执行
        return t;
    }

    //执行到delaydExecute方法
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);//把任务加入到执行队列中
            if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                task.cancel(false);
            else
                ensurePrestart();//任务未取消则调用
        }
    }

任务未取消则调用ensurePrestart(),ensurePrestart方法中调用了addWorker()方法,addWorker()方法中创建执行任务的Woker并且调用woker的run方法,调用runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x;
                    throw x;
                } catch (Error x) {
                    thrown = x;
                    throw x;
                } catch (Throwable x) {
                    thrown = x;
                    throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker方法中的任务通过调用了ThreadPoolExecutor类中的getTask方法获得
getTask()方法中调用了ScheduledThreadPoolExecutor内部类DelayedWorkQueue重写的take方法或poll方法

public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //实现延迟执行,根据延迟时间等待直到执行时间返回RunnableScheduledFuture
            for (; ; ) {
                RunnableScheduledFuture<?> first = queue[0];
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return finishPoll(first);
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }