自定义定时任务(从数据库读定时任务)

时间:2022-01-21 07:46:13

  最近项目经历,需要自定义增加定时发送任务,于是学习了下定时任务的知识,Elastic-Job 是基于成熟的开源产品Quartz和Zookeeper及其客户端Curator进行二次开发。那么我要实现一个和Elastic-Job 一样的自定义任务也应该基于Quartz

于是学习了下Quartz结合springboot的自定义任务。

1、任务记录入库,启动服务后读取该任务,并加入或删除或更新job计划。

2、定时任务执行指定任务。

指定任务是这类定时任务指定的是同样的方法或者一类方法 

具体代码思路如下

依赖的java包

        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context-support</artifactId>
            <version>3.2.17.RELEASE</version>
        </dependency>

 

a 配置的xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">

    <bean id="taskExecutor"
          class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="1"/>
        <property name="maxPoolSize" value="20"/>
        <property name="queueCapacity" value="100"/>
        <property name="keepAliveSeconds" value="2000"/>
        <property name="rejectedExecutionHandler">
            <bean class="java.util.concurrent.ThreadPoolExecutor$AbortPolicy"/>
        </property>
    </bean>

    <bean id="quartzManager" class="com.ql.vessels.schemajob.QuartzManager">
        <property name="scheduler" ref="schedulerManager"/>
    </bean>
    <bean id="quartzManagerJobDetail"
          class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
        <property name="targetObject" ref="quartzManager"/>
        <property name="targetMethod" value="reScheduleJob"/>
        <property name="concurrent" value="false"/>
    </bean>
    <bean id="cronTriggerBean" class="org.springframework.scheduling.quartz.SimpleTriggerBean">
        <property name="jobDetail" ref="quartzManagerJobDetail"/>
        <!-- 延时0秒 执行任务 -->
        <property name="startDelay" value="0"/>
        <!-- 任务执行周期 1min -->
        <property name="repeatInterval" value="60000"/>
    </bean>
    <!-- 总管理类 如果将lazy-init='false'那么容器启动就会执行调度程序  -->
    <bean id="schedulerManager" lazy-init="false" autowire="no"
          class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
        <property name="triggers">
            <list>
                <ref bean="cronTriggerBean"/>
            </list>
        </property>
        <property name="jobFactory" ref="jobFactory"></property>
        <property name="taskExecutor" ref="taskExecutor"/>
    </bean>
    <!--注意标记有背景色的这两行很重要是为了在执行方法中能够注入到需要的service-->
    <bean id="jobFactory" class="com.ql.vessels.schemajob.JobAdapter"></bean>


</beans>

b 这个是关键代码,

  从数据库读任务

  增加到job计划

  删除过时或者以取消的任务

  更新任务

package com.ql.vessels.schemajob;

import com.fqgj.log.factory.LogFactory;
import com.fqgj.log.interfaces.Log;

import com.ql.vessels.common.util.DateUtils;
import com.ql.vessels.domain.services.MsgSendSchemaService;
import com.ql.vessels.repo.entity.MsgSendSchemaEntity;
import com.ql.vessels.repo.entity.PushSchemaEntity;
import com.ql.vessels.repo.vo.TaskVo;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.CronTriggerBean;

import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author vampire
 * @date 2019/5/16 10:08 AM
 */
public class QuartzManager implements BeanFactoryAware {

    private final static Log log = LogFactory.getLog(QuartzManager.class);

    private Scheduler scheduler;

    private static BeanFactory beanFactory;

    @Autowired
    MsgSendSchemaService msgSendSchemaService;


    /**
     * 定时要执行的方法类。
     */
    public void reScheduleJob() {
        // 1.读取数据库中的任务列表。
        Date now = new Date();
        Map map = new HashMap(4);
        map.put("sendType", 2);
        //获取短信定时任务
        List<MsgSendSchemaEntity> list = msgSendSchemaService.selectByParams(map);
        for (MsgSendSchemaEntity sendSchema : list
                ) {
            TaskVo taskVo = new TaskVo();
            taskVo.setTaskId(sendSchema.getId());
            taskVo.setTaskType(1);
            taskVo.setCronExpression(DateUtils.getCron(sendSchema.getSendTime()));
            taskVo.setState(sendSchema.getStatus());
            if (sendSchema.getStatus() != 0 || now.after(sendSchema.getSendTime())) {
                //去掉过时的任务
                removeExpireTasks(taskVo);
            } else {
                configSchedul(taskVo);
            }
        }
       
    }


    /**
     * 移除过期任务
     *
     * @param bo
     */
    private void removeExpireTasks(TaskVo bo) {
        try {
            CronTriggerBean trigger = (CronTriggerBean) scheduler.getTrigger(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP);
            if (trigger != null) {
                log.info("==移除任务==" + bo.getTaskId() + "name" + bo.getTaskType());
                scheduler.pauseTrigger(trigger.getName(), trigger.getGroup());// 停止触发器
                scheduler.unscheduleJob(trigger.getName(), trigger.getGroup());// 移除触发器
                scheduler.deleteJob(trigger.getJobName(), trigger.getJobGroup());// 删除任务
            }
        } catch (SchedulerException e) {
            log.error("移除任务失败...");
            e.printStackTrace();
        }
    }


    /**
     * 配置任务列表
     *
     * @param bo
     */
    private void configSchedul(TaskVo bo) {
        try {
            CronTriggerBean trigger = (CronTriggerBean) scheduler.getTrigger(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP);
            if (trigger == null) {//说明schedule中不存在该定时任务
                createTriggerTask(bo);
            } else {
                updateTriggerTask(bo, trigger);
            }
        } catch (SchedulerException e) {
            log.error("获取触发器trigger失败...");
            e.printStackTrace();
        }
    }


    /**
     * 更新任务列表
     *
     * @param bo
     */
    private void updateTriggerTask(TaskVo bo, CronTriggerBean trigger) {
        if (bo.getState() == 0) {
            try {
                // 判断从DB中取得的任务时间和现在的quartz线程中的任务时间是否相等
                // 如果相等,则表示用户并没有重新设定数据库中的任务时间,这种情况不需要重新rescheduleJob
                if (trigger.getCronExpression() != null &&
                        !trigger.getCronExpression().equalsIgnoreCase(bo.getCronExpression())) {
                    log.info("=真正更新方法:=" + bo.getTaskId() + "name" + bo.getTaskType());
                    trigger.setCronExpression(bo.getCronExpression());
                    scheduler.rescheduleJob(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP, trigger);
                    log.info("更新任务时间失败...");
                }
            } catch (Exception e) {
                log.error("更新任务时间失败...");
                e.printStackTrace();
            }
        } else {
            this.removeExpireTasks(bo);
        }
    }

    /**
     * 创建任务列表
     *
     * @param bo
     */
    private void createTriggerTask(TaskVo bo) {
        if (bo.getState() == 0) {
            log.info("=创建:=" + bo.getTaskId() + "name" + bo.getTaskType());

            try {
                Class clazz = QuartzJobFactory.class;//执行计划任务的类
                JobDetail jobDetail = new JobDetail(bo.getTaskId() + "", clazz);
                Map map = new HashMap();
                map.put("task", bo);
                jobDetail.setJobDataMap(new JobDataMap(map));
                jobDetail.setName(bo.getTaskId() + "name" + bo.getTaskType());
                scheduler.addJob(jobDetail, true); // 将Job添加到管理类

                // 新一个基于Spring的时间类
                CronTriggerBean c = new CronTriggerBean();
                c.setCronExpression(bo.getCronExpression());// 设置时间表达式
                c.setName(bo.getTaskId() + "name" + bo.getTaskType());// 设置名称
                c.setJobDetail(jobDetail);// 注入Job
                c.setJobName(bo.getTaskId() + "name" + bo.getTaskType());// 设置Job名称
                scheduler.scheduleJob(c);// 注入到管理类
                scheduler.rescheduleJob(bo.getTaskId() + "name" + bo.getTaskType(), Scheduler.DEFAULT_GROUP, c);// 刷新管理类
            } catch (Exception e) {
                log.error("创建" + bo.getTaskId() + "name" + bo.getTaskType() + "任务失败...");
                e.printStackTrace();
            }
        } else {
            this.removeExpireTasks(bo);
        }
    }


    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;

    }

    public Scheduler getScheduler() {
        return scheduler;
    }

    public QuartzManager setScheduler(Scheduler scheduler) {
        this.scheduler = scheduler;
        return this;
    }

    public static BeanFactory getBeanFactory() {
        return beanFactory;
    }
}

c 任务添加后那每一个任务真正执行的方法是什么?需要我们来写任务具体做的事

所有的定时任务都会触发到这个 execute 方法中,根据定时任务创建时传入的参数来调用这个执行中的具体业务方法,

注意 :@Autowired 注入服务时空指针还需要添加下边一个类,刚刚在xml中也提到过,背景色标注的那两行就是为解决此问题的

package com.ql.vessels.schemajob;

import com.alibaba.fastjson.JSON;
import com.fqgj.log.factory.LogFactory;
import com.fqgj.log.interfaces.Log;
import com.ql.vessels.domain.services.MsgSendSchemaService;
import com.ql.vessels.domain.services.PushSchemaService;
import com.ql.vessels.repo.vo.TaskVo;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author vampire
 * @date 2019/5/16 11:56 AM
 */

@Component
public class QuartzJobFactory implements Job {
    private final static Log log = LogFactory.getLog(QuartzJobFactory.class);

    @Autowired
    MsgSendSchemaService msgSendSchemaService;
    @Autowired
    PushSchemaService pushSchemaService;


    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {

        JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
        TaskVo taskVo = (TaskVo) jobDataMap.get("task");
        log.info("定时任务开始执行。。。。。。。。。。。" + JSON.toJSONString(taskVo));
        if (taskVo.getTaskType() == 1) {
            log.info("message job=======" + taskVo.getTaskId());

            msgSendSchemaService.sendSchemaMsg(taskVo.getTaskId());
        } else if (taskVo.getTaskType() == 2) {
            //推送任务执行
            log.info("push job=======" + taskVo.getTaskId());
            pushSchemaService.sendSchemaPush(taskVo.getTaskId());
    }

    }
}

 

d 解决服务注入为空

package com.ql.vessels.schemajob;

import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;

/**
 * @author vampire
 * @date 2019/5/16 4:12 PM
 * 作用是解决QuartzJobFactory 类中不能注入服务类
 */
public class JobAdapter extends AdaptableJobFactory {
    @Autowired
    private AutowireCapableBeanFactory capableBeanFactory;

    @Override
    protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
        //调用父类的方法
        Object jobInstance = super.createJobInstance(bundle);
        //进行注入
        capableBeanFactory.autowireBean(jobInstance);
        return jobInstance;
    }
}

 

 

到此就完美解决了自定义任务,当然服务器是单台的没有考虑到集群,若集群的需要添加zk,跑任务有且只能有一个服务器去跑,负责重复重复后果很严重。