Zeus源码解读之定时任务执行与手动执行任务的过程分析

时间:2021-09-12 07:52:10

Zeus源码解读之定时任务执行与手动执行任务的过程分析

zeus集群依赖任务执行模式

 宙斯中任务出去任务独立调度之外,支持任务直接的复杂依赖调度,如下图一所示:
Zeus源码解读之定时任务执行与手动执行任务的过程分析

图1

 A为根任务,B,C依赖A任务,D任务依赖B,C,只有当A执行成功之后,B,C才会行,只有B,C都执行成功之后,D任务才会执行,C执行失败的情况,D就不会执行。

zeus集群依赖任务DAG执行过程源码解读

 Zeus的定时调度任务利用Quartz执行,了解Quartz的执行原理都知道,任务要能被执行必须实现Job接口才能被Scheduler执行,那么我们可以先找到Zeus有那个类实现了Quartz的Job接口。那就是JobController中的静态内部类TimerJob,TimerJob的定义如下:

public class JobController extends Controller {
...........

public static class TimerJob implements Job {
@Override
public void execute(JobExecutionContext context)
throws JobExecutionException {
String jobId = context.getJobDetail().getJobDataMap()
.getString("jobId");
Dispatcher dispatcher = (Dispatcher) context.getJobDetail()
.getJobDataMap().get("dispatcher");
ScheduleInfoLog.info("start the triggerEvent, the jobId = " + jobId);
ScheduleTriggerEvent ste = new ScheduleTriggerEvent(jobId);
dispatcher.forwardEvent(ste);
}

}

............
}

 看execute(JobExecutionContext context)函数的执行过程中,首先会从定时务的JobExecutionContext中获取到任务的jobId,然后获取到zeus中时间分发器Dispatcher,然后创建一个事件类ScheduleTriggerEvent,表示这个任务的是定时调度触发的事件,然后分发器处理事件,先看看dispatcher.forwardEvent(ste)的实现如下:

public class Dispatcher extends BaseObservable {

public static final EventType BeforeDispatch = new EventType();

public static final EventType AfterDispatch = new EventType();

private Map<String, AppEvent> history;

private List<Controller> controllers;

/**
* Forwards an application event to the dispatcher.
*
* @param event the application event
*/

public void forwardEvent(AppEvent event) {
dispatch(event);
}

private void dispatch(AppEvent event) {
try {
MvcEvent e = new MvcEvent(this, event);
e.setAppEvent(event);
if (fireEvent(BeforeDispatch, e)) {
List<Controller> copy = new ArrayList<Controller>(controllers);
for (Controller controller : copy) {
if (controller.canHandle(event)) {
if (!controller.initialized) {
controller.initialized = true;
controller.initialize();
}
controller.handleEvent(event);
}
}
fireEvent(AfterDispatch, e);
}
} catch (Exception e) {
ScheduleInfoLog.error("dispatch error", e);
}
}
}

 forwardEvent这个函数会调用Dispatcher类中的dispatch(AppEvent event)方法,重点关注 controller.handleEvent(event)这个方法的调用,它是对事件响应的直接处理方法,进入该方法中,Controller类是一个抽象类,JobController继承了Controller并重写了handleEvent(AppEvent event)方法,我们看看这个方法具体是怎么实现的

public class JobController extends Controller {

.........
@Override
public void handleEvent(AppEvent event) {
try {
if (event instanceof JobSuccessEvent) {
successEventHandle((JobSuccessEvent) event);
} else if (event instanceof JobFailedEvent) {
failedEventHandle((JobFailedEvent) event);
} else if (event instanceof ScheduleTriggerEvent) {
triggerEventHandle((ScheduleTriggerEvent) event);
} else if (event instanceof JobMaintenanceEvent) {
maintenanceEventHandle((JobMaintenanceEvent) event);
} else if (event instanceof JobLostEvent) {
lostEventHandle((JobLostEvent) event);
} else if (event.getType() == Events.Initialize) {
initializeEventHandle();
}
} catch (Exception e) {
// catch所有的异常,保证本job的异常不影响其他job的运行
ScheduleInfoLog.error("JobId:" + jobId + " handleEvent error", e);
}
}

/**
* 收到定时触发任务的事件的处理流程
*
* @param event
*/

private void triggerEventHandle(ScheduleTriggerEvent event) {
String eId = event.getJobId();
JobDescriptor jobDescriptor = cache.getJobDescriptor();
if (jobDescriptor == null) {// 说明job被删除了,这是一个异常状况,autofix
autofix();
return;
}
if (!eId.equals(jobDescriptor.getId())) {
return;
}
ScheduleInfoLog.info("JobId:" + jobId
+ " receive a timer trigger event,statisTime is:"
+ jobDescriptor.getStatisEndTime());
runJob(jobDescriptor);
}
................
}

 首先会对事件类型进行识别判定,当判定到是定时任务触发事件的时候,会调用自己的triggerEventHandle((ScheduleTriggerEvent)event)方法,这个方法的实现上面的代码已经给出,它的实现逻辑就是首先获取事件的唯一标识jobId,然后获取任务详情jobDescriptor,这个jobDescriptor包含了任务的版本,脚本内容,依赖关系等详细信息,最后执行runJob(jobDescriptor)这个方法,这个就是执行任务的步骤,看一下这个runJob(jobDescriptor)的具体实现:

    public class JobController extends Controller {
.......
private void runJob(JobDescriptor jobDescriptor) {
JobHistory history = new JobHistory();
history.setJobId(jobDescriptor.getId());
history.setToJobId(jobDescriptor.getToJobId() == null ? null : jobDescriptor.getToJobId());
history.setTriggerType(TriggerType.SCHEDULE);
history.setStatisEndTime(jobDescriptor.getStatisEndTime());
history.setTimezone(jobDescriptor.getTimezone());
history.setCycle(jobDescriptor.getCycle());
history.setHostGroupId(jobDescriptor.getHostGroupId());
history.setOperator(jobDescriptor.getOwner() == null ? null : jobDescriptor.getOwner());
context.getJobHistoryManager().addJobHistory(history);
master.run(history);
}
.......
}

 前面的创建任务运行历史类不多说,重点关注后面的任务执行调用与分发如下:

context.getJobHistoryManager().addJobHistory(history);
master.run(history);

 首先MasterContext会添加任务的运行历史,然后Master根据任务的历史信息执行任务调度,在看一下Master的run()方法是如何完成任务的执行的:

    public class Master {

......

public JobHistory run(JobHistory history) {
String jobId = history.getJobId();
int priorityLevel = 3;
try{
JobDescriptor jd = context.getGroupManager().getJobDescriptor(jobId).getX();
String priorityLevelStr = jd.getProperties().get("run.priority.level");
if(priorityLevelStr!=null){
priorityLevel = Integer.parseInt(priorityLevelStr);
}
}catch(Exception ex){
priorityLevel = 3;
}
JobElement element = new JobElement(jobId, history.getHostGroupId(), priorityLevel);
history.setStatus(com.taobao.zeus.model.JobStatus.Status.RUNNING);
if (history.getTriggerType() == TriggerType.MANUAL_RECOVER) {
for (JobElement e : new ArrayList<JobElement>(context.getQueue())) {
if (e.getJobID().equals(jobId)) {
history.getLog().appendZeus("已经在队列中,无法再次运行");
history.setStartTime(new Date());
history.setEndTime(new Date());
history.setStatus(com.taobao.zeus.model.JobStatus.Status.FAILED);
break;
}
}
for (Channel key : context.getWorkers().keySet()) {
MasterWorkerHolder worker = context.getWorkers().get(key);
if (worker.getRunnings().containsKey(jobId)) {
history.getLog().appendZeus("已经在运行中,无法再次运行");
history.setStartTime(new Date());
history.setEndTime(new Date());
history.setStatus(com.taobao.zeus.model.JobStatus.Status.FAILED);
break;
}
}
}

if (history.getStatus() == com.taobao.zeus.model.JobStatus.Status.RUNNING) {
history.getLog().appendZeus(
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
.format(new Date()) + " 进入任务队列");
context.getJobHistoryManager().updateJobHistoryLog(history.getId(),
history.getLog().getContent());
if (history.getTriggerType() == TriggerType.MANUAL) {
element.setJobID(history.getId());
context.getManualQueue().offer(element);
} else {
JobStatus js = context.getGroupManager().getJobStatus(
history.getJobId());
js.setStatus(com.taobao.zeus.model.JobStatus.Status.RUNNING);
js.setHistoryId(history.getId());
context.getGroupManager().updateJobStatus(js);
context.getQueue().offer(element);
}
}
context.getJobHistoryManager().updateJobHistory(history);
context.getJobHistoryManager().updateJobHistoryLog(history.getId(),
history.getLog().getContent());
return history;
}

.......

}

 他的执行逻辑是首先会对任务的执行优先权和任务调度类型以及是否已经运行做一个判断,当前面的判断结束之后,会将定时任务插入到任务队列的尾部

context.getQueue().offer(element);

 在队列中等待着被最终调度执行。而任务队列是被线程池执行者调度,我们看Master的源码中,有这样的代码执行逻辑如下:

 public class Master {

......
// 定时扫描等待队列
log.info("The scan rate is " + Environment.getScanRate());
context.getSchedulePool().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
scan();
} catch (Exception e) {
log.error("get job from queue failed!", e);
}
}
}, 0, Environment.getScanRate(), TimeUnit.MILLISECONDS);

//扫描可用的worker,给worker分配JOB任务
private void scan() {

if (!context.getQueue().isEmpty()) {
log.info("schedule queue :" +context.getQueue().size());
final JobElement e = context.getQueue().poll();
log.info("priority level :"+e.getPriorityLevel()+"; JobID :"+e.getJobID());
runScheduleAction(e);
}

if (!context.getManualQueue().isEmpty()) {
log.info("manual queue :" +context.getManualQueue().size());
final JobElement e = context.getManualQueue().poll();
log.info("priority level: "+e.getPriorityLevel()+"; JobID:"+e.getJobID());
MasterWorkerHolder selectWorker = getRunableWorker(e.getHostGroupId());

if (selectWorker == null) {
context.getManualQueue().offer(e);
log.info("HostGroupId : " + e.getHostGroupId() + ","+e.getJobID() +" is offered back to queue");
} else {
runManualJob(selectWorker, e.getJobID());
log.info("HostGroupId : " + e.getHostGroupId() + ",schedule selectWorker : " +selectWorker+",host :"+selectWorker.getHeart().host);
}
}

if (!context.getDebugQueue().isEmpty()) {
log.info("debug queue :" +context.getDebugQueue().size() );
final JobElement e = context.getDebugQueue().poll();
log.info("priority level:null; JobID:"+e.getJobID());
MasterWorkerHolder selectWorker = getRunableWorker(e.getHostGroupId());
if (selectWorker == null) {
context.getDebugQueue().offer(e);
log.info("HostGroupId : " + e.getHostGroupId() + ","+e.getJobID() +" is offered back to queue");
} else {
runDebugJob(selectWorker, e.getJobID());
log.info("HostGroupId : " + e.getHostGroupId() + ",schedule selectWorker : " +selectWorker+",host :"+selectWorker.getHeart().host);
}
}

}

private void runScheduleAction(final JobElement e) {
MasterWorkerHolder selectWorker = getRunableWorker(e.getHostGroupId());
if (selectWorker == null) {
context.getExceptionQueue().offer(e);
log.info("HostGroupId : " + e.getHostGroupId() + ","+e.getJobID() +" is offered to exceptionQueue");
} else {
runScheduleJob(selectWorker, e.getJobID());
log.info("HostGroupId : " + e.getHostGroupId() + ",schedule selectWorker : " +selectWorker+",host :"+selectWorker.getHeart().host);
}
}
.......

}

 他会调用MasterContext的中的线程池调度,然后扫描任务队列,对于定时触发任务会调用 final JobElement e = context.getQueue().poll()取出任务,然后执行runScheduleAction(e),执行调度,首先将任务追加到ExceptionQueue队列中,当找到
可分发任务的节点之后,将任务分发到该节点,并执行任务调度。
 至此,定时任务执行的完整周期分析结束。

zeus集群手动执行一次任务的执行过程源码解读

 手动执行一次任务的逻辑与定时触发任务的执行逻辑在任务执行流程上没有太大的区别,最大的区别就是事件的触发机制不同,定时调度触发的事件属于AppEvent,手动执行一次任务的事件属于MVCEvent,事件又GWT页面触发。首先看界面代码部分:

public class ChooseConfigWindow extends Window{
.......
private TextButton submit=new TextButton("执行", new SelectHandler() {
public void onSelect(SelectEvent event) {
Map<String, String> md=combo.getValue();
if(combo.validate() && md!=null){
if(jobPresenter!=null){
final AutoProgressMessageBox wait=new AutoProgressMessageBox("运行中","doing");
wait.auto();
wait.show();
String actionid = md.get("actionid");
RPCS.getJobService().run(actionid, type, new AbstractAsyncCallback<Void>() {
@Override
public void onSuccess(Void result) {
wait.hide();
Info.display("成功", Startmsg.get(type-1));
hide();
}
@Override
public void onFailure(Throwable caught) {
wait.hide();
AlertMessageBox alert=new AlertMessageBox("失败", Errormsg.get(type-1));
alert.show();
};
});
}
}
}
});
.......
}

我们不关心GWT编写界面的实现细节,主要看界面的异步请求过程,RPCS.getJobService().run(actionid, type, new AbstractAsyncCallback(),这个run方法会请求后台,并传过去任务的版本actionid,我们在看看run()方法的实现,也就是JobServiceImpl类的run方法,如下:

public class JobServiceImpl implements JobService {
@Override
public void run(String jobId, int type) throws GwtException {
TriggerType triggerType = null;
JobDescriptor jobDescriptor = null;
ExecuteKind kind = null;
if (type == 1) {
triggerType = TriggerType.MANUAL;
kind = ExecuteKind.ManualKind;
} else if (type == 2) {
triggerType = TriggerType.MANUAL_RECOVER;
kind = ExecuteKind.ScheduleKind;
}
if (!permissionManager.hasActionPermission(
LoginUser.getUser().getUid(), jobId)) {
GwtException e = new GwtException("你没有权限执行该操作");
log.error(e);
throw e;
}
Tuple<JobDescriptor, JobStatus> job = permissionGroupManager
.getJobDescriptor(jobId);
jobDescriptor = job.getX();
JobHistory history = new JobHistory();
history.setJobId(jobId);
history.setToJobId(jobDescriptor.getToJobId());
history.setTriggerType(triggerType);
history.setOperator(jobDescriptor.getOwner());
history.setIllustrate("触发人:" + LoginUser.getUser().getUid());
history.setStatus(Status.RUNNING);
history.setStatisEndTime(jobDescriptor.getStatisEndTime());
history.setTimezone(jobDescriptor.getTimezone());
history.setHostGroupId(jobDescriptor.getHostGroupId());
jobHistoryManager.addJobHistory(history);

try {
worker.executeJobFromWeb(kind, history.getId());
} catch (Exception e) {
log.error("error", e);
throw new GwtException(e.getMessage());
}
}
}

 前面的操作主要是获取任务版本详情和创建运行日志,主要看worker.executeJobFromWeb(kind, history.getId()),这是节点发起一次RPC请求,我们具体看看这个请求过程怎么实现的:

public class ClientWorker {
........

public void executeJobFromWeb(ExecuteKind kind, String id) throws Exception {
WebResponse resp = new WorkerWebExecute().send(context, kind, id).get();
if (resp.getStatus() == Status.ERROR) {
throw new Exception(resp.getErrorText());
}
}
...........
}

再看看这个send()函数的请求过程,在WorkerWebExecute中:

public class WorkerWebExecute {

public Future<WebResponse> send(final WorkerContext context,ExecuteKind kind,String id){

final WebRequest req=WebRequest.newBuilder().setRid(AtomicIncrease.getAndIncrement()).setOperate(WebOperate.ExecuteJob)
.setEk(kind).setId(id).build();
SocketMessage sm=SocketMessage.newBuilder().setKind(Kind.WEB_REUQEST).setBody(req.toByteString()).build();

Future<WebResponse> f=context.getThreadPool().submit(new Callable<WebResponse>() {
private WebResponse response;
public WebResponse call() throws Exception {
final CountDownLatch latch=new CountDownLatch(1);
context.getHandler().addListener(new ResponseListener() {
public void onWebResponse(WebResponse resp) {
if(resp.getRid()==req.getRid()){
context.getHandler().removeListener(this);
response=resp;
latch.countDown();
}
}
public void onResponse(Response resp) {}
});
latch.await();
return response;
}
});
context.getServerChannel().write(sm);
SocketLog.info("send web execute request,rid="+req.getRid()+",kind="+kind+",id="+id);
return f;
}
}

代码看起来很复杂,但是主要操作就是利用protobuff系列化web请求数据,最终把请求的数据写入channel中,

    context.getServerChannel().write(sm);

最后WorkerHandler执行接受Channel中的数据,具体看WorkerHandler的messageReceived()函数:


public class WorkerHandler extends SimpleChannelUpstreamHandler{

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
SocketMessage sm=(SocketMessage) e.getMessage();
if(sm.getKind()==Kind.REQUEST){
final Request request=Request.newBuilder().mergeFrom(sm.getBody()).build();
Operate op=request.getOperate();
if(op==Operate.Schedule || op==Operate.Manual || op==Operate.Debug){
completionService.submit(new Callable<Response>() {
private WorkerBeExecute execute=new WorkerBeExecute();
public Response call() throws Exception {
return execute.execute(context, request).get();
}
});
}else if(request.getOperate()==Operate.Cancel){
completionService.submit(new Callable<Response>() {
private WorkerBeCancel cancel=new WorkerBeCancel();
public Response call() throws Exception {
return cancel.execute(context, request).get();
}
});
}
}else if(sm.getKind()==Kind.RESPONSE){
final Response resp=Response.newBuilder().mergeFrom(sm.getBody()).build();
for(ResponseListener lis:listeners){
lis.onResponse(resp);
}
}else if(sm.getKind()==Kind.WEB_RESPONSE){
final WebResponse resp=WebResponse.newBuilder().mergeFrom(sm.getBody()).build();
for(ResponseListener lis:listeners){
lis.onWebResponse(resp);
}
}
super.messageReceived(ctx, e);
}

}

对于手动执行任务,会调用WorkerBeExecute的execute()函数,看看这个函数的实现过程:

public class WorkerBeExecute {
.........
private static Logger log = LoggerFactory.getLogger(WorkerBeExecute.class);
public Future<Response> execute(final WorkerContext context,
final Request req) {
if (req.getOperate() == Operate.Debug) {
return debug(context, req);
} else if (req.getOperate() == Operate.Manual) {
return manual(context, req);
} else if (req.getOperate() == Operate.Schedule) {
return schedule(context, req);
}
return null;
}

public Future<Response> manual(final WorkerContext context,
final Request req) {
ManualMessage mm = null;
try {
mm = ManualMessage.newBuilder().mergeFrom(req.getBody()).build();
} catch (InvalidProtocolBufferException e1) {
}
SocketLog.info("receive master to worker manual request,rid="
+ req.getRid() + ",historyId=" + mm.getHistoryId());
final String historyId = mm.getHistoryId();
final JobHistory history = context.getJobHistoryManager()
.findJobHistory(historyId);
Future<Response> f = context.getThreadPool().submit(
new Callable<Response>() {
public Response call() throws Exception {
history.setExecuteHost(WorkerContext.host);
history.setStartTime(new Date());
context.getJobHistoryManager()
.updateJobHistory(history);

String date = new SimpleDateFormat("yyyy-MM-dd")
.format(new Date());
File direcotry = new File(Environment.getDownloadPath()
+ File.separator + date + File.separator
+ "manual-" + history.getId());
if (!direcotry.exists()) {
direcotry.mkdirs();
}
JobBean jb = context.getGroupManager()
.getUpstreamJobBean(history.getJobId());

final Job job = JobUtils.createJob(new JobContext(JobContext.MANUAL_RUN),
jb, history, direcotry.getAbsolutePath(),
context.getApplicationContext());
context.getManualRunnings().put(historyId, job);

Integer exitCode = -1;
Exception exception = null;
try {
exitCode = job.run();
} catch (Exception e) {
exception = e;
history.getLog().appendZeusException(e);
} finally {
JobHistory jobHistory = context
.getJobHistoryManager()
.findJobHistory(history.getId());
jobHistory.setEndTime(new Date());
if (exitCode == 0) {
jobHistory
.setStatus(com.taobao.zeus.model.JobStatus.Status.SUCCESS);
} else {
jobHistory
.setStatus(com.taobao.zeus.model.JobStatus.Status.FAILED);
}
context.getJobHistoryManager().updateJobHistory(
jobHistory);
history.getLog().appendZeus("exitCode=" + exitCode);
try{
context.getJobHistoryManager().updateJobHistoryLog(
history.getId(),
history.getLog().getContent());
}catch(Exception ex){
log.error("update manual job log exception:", ex);
}
context.getManualRunnings().remove(historyId);
}

Status status = Status.OK;
String errorText = "";
if (exitCode != 0) {
status = Status.ERROR;
}
if (exception != null && exception.getMessage() != null) {
errorText = exception.getMessage();
}
Response resp = Response.newBuilder().setRid(
req.getRid()).setOperate(Operate.Manual)
.setStatus(status).setErrorText(errorText)
.build();
SocketLog
.info("send manual response,manual complete,rid="
+ req.getRid()
+ ",historyId="
+ historyId);
return resp;
}
});
return f;
}

.........
}

 execute()函数对于手动任务会调用manual函数,到这一步之后,基本上操作的流程就和自动调度任务的执行过程类似,出去获取任务的基本信息以protobuff的序列化与反序列化操作反操作之外,最主要的操作是context.getManualRunnings().put(historyId, job),将任务加到手动任务队列当中,等待着被线程池调度。
 至此,手动执行一次任务的请求过程才算完成,后续的任务执行调度由Zeus系统完成。不得不说,客户端触发的任务操作比自动调度执行略微复杂,中间掺插着许多netty通信与protobuff对请求数据封装的操作,才使得代码阅读性变得更加复杂。

结束语

 任务调度执行是Zeus系统中最为核心的操作,整个调度执行过程涉及到Zeus中封装的事件Event触发与事件监听Listener与事件响应Dispatcher与服务器之间的RPC与任务队列的线程调度,可以完成的流程分析下来还是比较复杂的,这也是分布式系统中无法避免的一些复杂性问题。