实现运行在独立线程池的调度功能,基于Spring和Annotation

时间:2023-03-09 07:05:33
实现运行在独立线程池的调度功能,基于Spring和Annotation

使用Spring的注解(@Scheduled)声明多个调度的时候,由于其默认实现机制,将导致多个调度方法之间相互干扰(简单理解就是调度不按配置的时间点执行).

为了解决该问题尝试了修改线程池大小,但是治标不治本,原因就是Spring注解方式都在一个相同的线程池抢夺线程资源并且还存在线性执行的问题(这个不是很确定,但是感觉存在这问题).

由于以上原因,决定自行实现简单的调度机制,目前该机制已经上线,经过几次修正,截止目前已经稳定的为系统提供服务.

实现多线程互不干扰的调度需要解决如下几个关键点:

1. 调度程序核心机制怎么实现(如何让某个方法在合适的时间点执行)?

2. 如何保证调度之间相互独立执行(关键问题)?

3. 通过什么方式来配置调度相关信息?

针对以上几个点,采取了如下方式:

1. 调度核心使用jdk的ScheduledExecutorService接口实现

2. 基于1的实现方式,为每个调度获取不同的线程池,并将相关信息登记入册,以供后续使用

3. 通过注解方式声明调度的相关信息,包括间隔时间、初始化执行时间、时间间隔单位等基本属性

首先,来看第一个知识点ScheduledExecutorService,如果您了解该接口,可以跳过该段.

jdk其实自带了调度工具叫做java.util.concurrent.ScheduledExecutorService.通过该工具可以实现与spring scheduled和quartz相同的调度功能,并且可灵活控制.所以决定使用该方式进行尝试.

查看该接口,可以发现一共定义了几个方法

实现运行在独立线程池的调度功能,基于Spring和Annotation

这里我只关心最后一个方法,用来实现上一次调度完成后多久再次执行

实现运行在独立线程池的调度功能,基于Spring和Annotation

有了这个就好办了,直接将原有的注解去掉,在合适的地方通过java.util.concurrent.Executors.newSingleThreadExecutor()方法获取ScheduledExecutorService接口实例.

具体参数不做过多解释,只简单说说第一个参数Runnable command,这个参数传递的就是我们具体要执行的代码逻辑,需要将调度代码放在实现了Runnable接口的实例中的run方法中(有点绕).其实有更好的办法,直接使用一个叫做ScheduledMethodRunnable的类,将某个方法包装成Runnable接口实现类也可以.

ScheduledMethodRunnable类的实现原理就是通过反射动态调用,但是这里要求所包装的方法不能有返回值和参数.

有了以上信息,实现调度就没问题了,通过制定间隔执行时间,就可以达到定期执行的目的了.

然后,来看第二个知识点,这个就比较简单了,为每个调度单独创建一个线程池,并将各种信息登记入册

只需要两部就可以完成:

 ScheduledExecutorService ses = Executors.newScheduledThreadPool(1);//Executors.newSingleThreadScheduledExecutor()也可以
ScheduledFuture<?> sf = ses.scheduleWithFixedDelay(runnableMethod, 初始时间,间隔时间, 时间单位);

最后,来看如何配置,这里我们是基于Spring实现的机制,因此学习了与Spring原生调度相同的注解方式

首先定义注解,直接看代码不解释

实现运行在独立线程池的调度功能,基于Spring和Annotation

然后定义一个注解处理器,处理器负责解析注解并为注解的方法创建调度.

这里的重点是Spring的几个接口BeanPostProcessor, DisposableBean, ApplicationListener<ContextRefreshedEvent>.

我们需要在Bean注册的过程中发现实现了目标注解的所有方法并一一记录,在Spring容器加载完成的时候开始执行所有记录下来的方法,最终在Spring容器释放的时候关闭登记的所有调度处理程序(必须关闭,否则线程不释放).

BeanPostProcessor:Spring容器初始化过程中扫描到Bean会通知该接口实现类

ApplicationListener<ContextRefreshedEvent>:会接受Spring容器发起的ContextRefreshedEvent事件,收到该事件说明容器已经初始化完成,我们可以继续做后续工作.第一版的时候没有使用该事件,直接在BeanPostProcessor接口中启动了调度,导致了各种奇葩问题出现,之后才将具体的调度启动逻辑挪到这里(spring调度也是这么干的,抄袭抄袭)

DisposableBean:Bean释放的接口,当应用容器关闭的时候会触发该接口,我们需要释放已经启动的相关调度.这里也踩过坑,因为开始不知道释放ScheduledExecutorService实例,导致应用容器残留线程在内存中,产生了一大堆奇怪问题,最后找到这里(才发现是未释放之前启动的线程池造成的)

不要忘了将实现了以上接口的类注册到Spring容器中,xml配置也好注解配置也罢,加载上为目的.

最后附上相关源码:

 import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME; import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit; @Documented
@Retention(RUNTIME)
@Target(METHOD)
public @interface AdvanceDelayScheduled { public long initialDelay() default 0; public long fixedDelay(); public TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}
 import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Scope;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.scheduling.support.ScheduledMethodRunnable;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils; @Component
@Scope("singleton")
public class AdvanceDelayScheduledProcessor
implements BeanPostProcessor, DisposableBean, ApplicationListener<ContextRefreshedEvent> { private static final Logger logger = Logger.getLogger(AdvanceDelayScheduledProcessor.class);
private final Set<Class<?>> nonAnnotatedClasses = Collections
.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64)); @Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass)) {
Map<Method, Set<AdvanceDelayScheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
new MethodIntrospector.MetadataLookup<Set<AdvanceDelayScheduled>>() {
@Override
public Set<AdvanceDelayScheduled> inspect(Method method) {
Set<AdvanceDelayScheduled> scheduledMethods = AnnotationUtils
.getRepeatableAnnotations(method, AdvanceDelayScheduled.class);
return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
}
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @AdvanceDelayScheduled annotations found on bean class: " + bean.getClass());
}
} else {
// Non-empty set of methods
for (Map.Entry<Method, Set<AdvanceDelayScheduled>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (AdvanceDelayScheduled ads : entry.getValue()) {
regScheduledItem(ads, method, bean);
}
}
logger.info("发现调度@AdvanceDelayScheduled:" + annotatedMethods.size() + " methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
}
return bean;
} @Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
} /**
* 注册调度到列表,在spring加载完成后统一启动
*
* @param scheduled
* @param method
* @param bean
*/
protected void regScheduledItem(AdvanceDelayScheduled scheduled, Method method, Object bean) { Assert.isTrue(void.class == method.getReturnType(),
"Only void-returning methods may be annotated with @AdvanceDelayScheduled");
Assert.isTrue(method.getParameterTypes().length == 0,
"Only no-arg methods may be annotated with @AdvanceDelayScheduled"); if (AopUtils.isJdkDynamicProxy(bean)) {
try {
// Found a @AdvanceDelayScheduled method on the target class for
// this JDK
// proxy ->
// is it also present on the proxy itself?
method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
} catch (SecurityException ex) {
ReflectionUtils.handleReflectionException(ex);
} catch (NoSuchMethodException ex) {
throw new IllegalStateException(String.format(
"@AdvanceDelayScheduled method '%s' found on bean target class '%s' but not "
+ "found in any interface(s) for a dynamic proxy. Either pull the "
+ "method up to a declared interface or switch to subclass (CGLIB) "
+ "proxies by setting proxy-target-class/proxyTargetClass to 'true'.",
method.getName(), method.getDeclaringClass().getSimpleName()));
}
} else if (AopUtils.isCglibProxy(bean)) {
// Common problem: private methods end up in the proxy instance, not
// getting
// delegated.
if (Modifier.isPrivate(method.getModifiers())) {
throw new IllegalStateException(String.format(
"@AdvanceDelayScheduled method '%s' found on CGLIB proxy for target class '%s' but cannot "
+ "be delegated to target bean. Switch its visibility to package or protected.",
method.getName(), method.getDeclaringClass().getSimpleName()));
}
} ScheduledMethodRunnable runnable = new ScheduledMethodRunnable(bean, method); // 注册到调度列表,等待加载完成再执行
scheduledItems.add(new ScheduledItem(method.getName(), runnable, scheduled)); } private List<ScheduledItem> scheduledItems = new ArrayList<>(); @Override
public void destroy() throws Exception {
// 释放所有调度任务
scheduledItems.forEach(item -> {
try {
logger.info("关闭ScheduledExecutorService:" + item.getScheduledExecutorService());
item.getScheduledExecutorService().shutdown();
try {
if (!item.getScheduledExecutorService().awaitTermination(5, TimeUnit.SECONDS))
item.getScheduledExecutorService().shutdownNow();
} catch (Exception e) {
item.getScheduledExecutorService().shutdownNow();
throw e;
}
} catch (Exception e) {
logger.error("关闭ScheduledExecutorService失败," + e.getMessage(), e);
}
});
} @Override
public void onApplicationEvent(ContextRefreshedEvent event) {
// 在这里启动所有调度
if (event.getApplicationContext().getParent() != null) {
return;
} scheduledItems.forEach(item -> {
try {
AdvanceDelayScheduled ads = item.getAdvanceDelayScheduled();
// 每次都重新创建一个调度器
ScheduledExecutorService ses = Executors.newScheduledThreadPool(item.getPoolSize());
ScheduledFuture<?> sf = ses.scheduleWithFixedDelay(item.getRunnable(), ads.initialDelay(),
ads.fixedDelay(), ads.timeUnit());
item.setScheduledExecutorService(ses);
item.setScheduledFuture(sf);
} catch (Exception e) {
logger.error("启动调度失败,调度名称[" + item.getName() + "]," + e.getMessage(), e);
}
}); } protected class ScheduledItem {
private String name;
private Runnable runnable;
private AdvanceDelayScheduled advanceDelayScheduled;
private int poolSize = 1;
private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> scheduledFuture; public ScheduledItem(String name, Runnable runnable, AdvanceDelayScheduled advanceDelayScheduled) {
super();
this.name = name;
this.runnable = runnable;
this.advanceDelayScheduled = advanceDelayScheduled;
} public ScheduledItem() {
super();
// TODO Auto-generated constructor stub
} /**
* @return the name
*/
public String getName() {
return name;
} /**
* @param name
* the name to set
*/
public void setName(String name) {
this.name = name;
} /**
* @return the runnable
*/
public Runnable getRunnable() {
return runnable;
} /**
* @param runnable
* the runnable to set
*/
public void setRunnable(Runnable runnable) {
this.runnable = runnable;
} /**
* @return the advanceDelayScheduled
*/
public AdvanceDelayScheduled getAdvanceDelayScheduled() {
return advanceDelayScheduled;
} /**
* @param advanceDelayScheduled
* the advanceDelayScheduled to set
*/
public void setAdvanceDelayScheduled(AdvanceDelayScheduled advanceDelayScheduled) {
this.advanceDelayScheduled = advanceDelayScheduled;
} /**
* @return the poolSize
*/
public int getPoolSize() {
return poolSize;
} /**
* @param poolSize
* the poolSize to set
*/
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
} /**
* @return the scheduledFuture
*/
public ScheduledFuture<?> getScheduledFuture() {
return scheduledFuture;
} /**
* @param scheduledFuture
* the scheduledFuture to set
*/
public void setScheduledFuture(ScheduledFuture<?> scheduledFuture) {
this.scheduledFuture = scheduledFuture;
} /**
* @return the scheduledExecutorService
*/
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
} /**
* @param scheduledExecutorService
* the scheduledExecutorService to set
*/
public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
this.scheduledExecutorService = scheduledExecutorService;
} } }