spring的Scheduled(定时任务)和多线程

时间:2022-06-25 08:34:52

一、前言

在我们日常的开发中,经常用到数据同步的更新,这时我们采用的是spring的定时任务和java的多线程进行数据的更新,进行时实的服务调用。

二.实现思路

           1.创建线程类

           2.创建ExecutorService线程连接池

           3.调用线程池操作

           4.spring的Scheduled(定时任务)配置 

三.创建线程类

创建的线程类,我们采用的是实现Runnable接口,使用该类可以共享,在线程中要么继承Thread,或者实现Runnable接口,其中Thread是实现了Runnable的接口,实现Runnable接口可以更好的扩展和共享类,因为java是单继承多实现的程序。

public class ArchiveTask implements Runnable {

    /**
* log.
*/
private static Logger logger = LoggerFactory.getLogger(ArchiveTask.class); //注:由于spring事务配置采用的注解的方式来配置传播性配置,此类没有在spring传播配置里面,所以不能用自动装配的方式来配置
private static FaxRecvArchiveCtrl faxRecvArchiveCtrl = AppContextAware.getBean(FaxRecvArchiveCtrl.class); private FaxRecvArchiveDTO faxRecvArchiveDTO; /**
* CountDownLatch是一个同步辅助类,在一组线程在执行操作之前,允许一个或多线程处于等待
* 主要方法
* public CountDownLatch(int count);
* public void countDown();
* public void await() throws InterruptedException
*
* 构造方法参数指定了计数的次数
* countDown方法,当前线程调用此方法,则计数减一
* awaint方法,调用此方法会一直阻塞当前线程,直到计时器的值为0
*
*/
private CountDownLatch latch; public ArchiveTask(FaxRecvArchiveDTO faxRecvArchiveDTO, CountDownLatch latch) {
this.faxRecvArchiveDTO = faxRecvArchiveDTO;
this.latch = latch;
} @Override
public void run() {
try {
faxRecvArchiveCtrl.updateArchiveFileFlag(ServiceFactory.getCurrentSessionId(), faxRecvArchiveDTO);
} catch (Exception ex) {
logger.debug("["+this.faxRecvArchiveDTO.getFaxKey()+"]线程执行异常,错误信息为:"+ex.getMessage());
} finally{
logger.debug("线程执行完:"+this.faxRecvArchiveDTO.getFaxKey());
latch.countDown();
}
}
}

 四、调用线程池操作

public class EasyFaxArchiveFileMgr {
/**
* 日志
*/
Logger logger = LoggerFactory.getLogger(EasyFaxArchiveFileMgr.class); /**
* 线程池数量
*/
private int ThreadNum = 5; private static FaxRecvArchiveCtrl faxRecvArchiveCtrl = AppContextAware.getBean(FaxRecvArchiveCtrl.class); private static List<FaxRecvArchiveDTO> noArchiveList = new ArrayList<FaxRecvArchiveDTO>();

public void archiveFile() throws ShineException{
noArchiveList = faxRecvArchiveCtrl.getFaxRecvNoArchiveFile(); ExecutorService executorService = Executors.newFixedThreadPool(ThreadNum); List<ArchiveTask> taskList = new ArrayList<ArchiveTask>();
synchronized (noArchiveList) {
while(CollectionUtils.isEmpty(noArchiveList) == false) { try {
//定义计数器
int countLatch = (noArchiveList.size() > ThreadNum) ? ThreadNum :noArchiveList.size(); CountDownLatch latch = new CountDownLatch(countLatch); //组织任务
for(int i = 0; i < ThreadNum; i++){
if(CollectionUtils.isEmpty(noArchiveList) == false) {
taskList.add(new ArchiveTask(noArchiveList.remove(0),latch));
}else{
break;
}
}
//提交任务
if (CollectionUtils.isEmpty(taskList) == false) {
for (int i = 0, size = taskList.size(); i < size; i++) {
executorService.submit(taskList.get(i));
}
}//等待所有线程都执行完
latch.await();
} catch(Exception ex) {
logger.error("线程池处理出错,原因:"+ex.getMessage());
} finally {
taskList.clear();
}
//关闭线程连接池
executorService.shutdown();
}
} }
}

五.spring配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-2.5.xsd"> <bean id="timerFactory" class="org.springframework.scheduling.timer.TimerFactoryBean">
<property name="scheduledTimerTasks">
<list>
<ref bean="archiveFileScheduler"/>
</list>
</property>
</bean>
<bean id="archiveFileScheduler" class="org.springframework.scheduling.timer.ScheduledTimerTask">
<property name="delay">
<!-- 启动30秒后开始运行 -->
<value>30000</value>
</property>
<property name="period">
<!-- 每10秒运行 -->
<value>10000</value>
</property>
<property name="timerTask">
<ref bean="archiveFileHandleMethodInvoker" />
</property>
</bean> <bean id="archiveFileHandleMethodInvoker" class="org.springframework.scheduling.timer.MethodInvokingTimerTaskFactoryBean">
<property name="targetObject">
<ref bean="com.shine.emofs.thirdparty.easyfax.ctrl.EasyFaxArchiveFileMgr" />
</property>
<property name="targetMethod">
<value>archiveFile</value>
</property>
</bean>
</beans>

六、参考:

ExecutorService线程连接池:http://www.itzhai.com/the-executorservice-common-method-newfixedthreadpool-of-create-fixed-size-thread-pool.html#read-more