Flink -- Failover

时间:2022-09-12 14:34:32

 

JobManager failover

 

LeaderLatch

private synchronized void setLeadership(boolean newValue)
{
boolean oldValue = hasLeadership.getAndSet(newValue); if ( oldValue && !newValue ) //原来是leader,当前不是leader,所以是lost leadership
{ // Lost leadership, was true, now false
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener listener)
{
listener.notLeader();
return null;
}
});
}
else if ( !oldValue && newValue )
{ // Gained leadership, was false, now true
listeners.forEach(new Function<LeaderLatchListener, Void>()
{
@Override
public Void apply(LeaderLatchListener input)
{
input.isLeader();
return null;
}
});
} notifyAll();
}
 

ZooKeeperLeaderElectionService

@Override
public void isLeader() {
synchronized (lock) {
issuedLeaderSessionID = UUID.randomUUID(); leaderContender.grantLeadership(issuedLeaderSessionID);
}
} @Override
public void notLeader() {
synchronized (lock) {
issuedLeaderSessionID = null;
confirmedLeaderSessionID = null; leaderContender.revokeLeadership();
}
}

可以看到,只是分别调用leaderContender.grantLeadership,leaderContender.revokeLeadership

 

而JobManager继承了leaderContender接口,

revokeLeadership

val newFuturesToComplete = cancelAndClearEverything(
new Exception("JobManager is no longer the leader."))

 

在cancelAndClearEverything中,关键的是suspend executionGraph;停止执行,但是并不会job删除,这样其他的JobManager还能重新提交

* The SUSPENDED state is a local terminal state which stops the execution of the job but does
* not remove the job from the HA job store so that it can be recovered by another JobManager.
private def cancelAndClearEverything(cause: Throwable)
: Seq[Future[Unit]] = {
val futures = for ((jobID, (eg, jobInfo)) <- currentJobs) yield {
future {
eg.suspend(cause) //suspend Execution Graph if (jobInfo.listeningBehaviour != ListeningBehaviour.DETACHED) {
jobInfo.client ! decorateMessage(
Failure(new JobExecutionException(jobID, "All jobs are cancelled and cleared.", cause)))
}
}(context.dispatcher)
} currentJobs.clear() futures.toSeq
}

 

grantLeadership

context.system.scheduler.scheduleOnce(
jobRecoveryTimeout,
self,
decorateMessage(RecoverAllJobs))(
context.dispatcher)

主要是要恢复所有的job,RecoverAllJobs

case RecoverAllJobs =>
future {
try {
// The ActorRef, which is part of the submitted job graph can only be
// de-serialized in the scope of an actor system.
akka.serialization.JavaSerializer.currentSystem.withValue(
context.system.asInstanceOf[ExtendedActorSystem]) { log.info(s"Attempting to recover all jobs.") val jobGraphs = submittedJobGraphs.recoverJobGraphs().asScala //从submittedJobGraphs store里面读出所有submitted的job,也是从zk里面读出 if (!leaderElectionService.hasLeadership()) {
// we've lost leadership. mission: abort.
log.warn(s"Lost leadership during recovery. Aborting recovery of ${jobGraphs.size} " +
s"jobs.")
} else {
log.info(s"Re-submitting ${jobGraphs.size} job graphs.") jobGraphs.foreach{
submittedJobGraph =>
self ! decorateMessage(RecoverSubmittedJob(submittedJobGraph)) //recover job
}
}
}
} catch {
case t: Throwable => log.error("Fatal error: Failed to recover jobs.", t)
}
}(context.dispatcher)

 

在recover job,

case RecoverSubmittedJob(submittedJobGraph) =>
if (!currentJobs.contains(submittedJobGraph.getJobId)) {
submitJob(
submittedJobGraph.getJobGraph(),
submittedJobGraph.getJobInfo(),
isRecovery = true)
}
else {
log.info(s"Ignoring job recovery for ${submittedJobGraph.getJobId}, " +
s"because it is already submitted.")
}

其实就是重新的submit job,注意这里的,isRecovery = true

在submit job时,如果isRecovery = true,会做下面的操作,然后后续具体的操作参考Checkpoint篇

if (isRecovery) {
executionGraph.restoreLatestCheckpointedState()
}

 

TaskManager Failover

在job manager内部通过death watch发现task manager dead,

/**
* Handler to be executed when a task manager terminates.
* (Akka Deathwatch or notifiction from ResourceManager)
*
* @param taskManager The ActorRef of the taskManager
*/
private def handleTaskManagerTerminated(taskManager: ActorRef): Unit = {
if (instanceManager.isRegistered(taskManager)) {
log.info(s"Task manager ${taskManager.path} terminated.") instanceManager.unregisterTaskManager(taskManager, true)
context.unwatch(taskManager)
}
}

instanceManager.unregisterTaskManager,

/**
* Unregisters the TaskManager with the given {@link ActorRef}. Unregistering means to mark
* the given instance as dead and notify {@link InstanceListener} about the dead instance.
*
* @param instanceID TaskManager which is about to be marked dead.
*/
public void unregisterTaskManager(ActorRef instanceID, boolean terminated){
Instance instance = registeredHostsByConnection.get(instanceID); if (instance != null){
ActorRef host = instance.getActorGateway().actor(); registeredHostsByConnection.remove(host);
registeredHostsById.remove(instance.getId());
registeredHostsByResource.remove(instance.getResourceId()); if (terminated) {
deadHosts.add(instance.getActorGateway().actor());
} instance.markDead(); totalNumberOfAliveTaskSlots -= instance.getTotalNumberOfSlots(); notifyDeadInstance(instance);
}
}

 

instance.markDead()

public void markDead() {

    // create a copy of the slots to avoid concurrent modification exceptions
List<Slot> slots; synchronized (instanceLock) {
if (isDead) {
return;
}
isDead = true; // no more notifications for the slot releasing
this.slotAvailabilityListener = null; slots = new ArrayList<Slot>(allocatedSlots); allocatedSlots.clear();
availableSlots.clear();
} /*
* releaseSlot must not own the instanceLock in order to avoid dead locks where a slot
* owning the assignment group lock wants to give itself back to the instance which requires
* the instance lock
*/
for (Slot slot : slots) {
slot.releaseSlot();
}
}

 

SimpleSolt.releaseSlot

@Override
public void releaseSlot() { if (!isCanceled()) { // kill all tasks currently running in this slot
Execution exec = this.executedTask;
if (exec != null && !exec.isFinished()) {
exec.fail(new Exception(
"The slot in which the task was executed has been released. Probably loss of TaskManager "
+ getInstance()));
} // release directly (if we are directly allocated),
// otherwise release through the parent shared slot
if (getParent() == null) {
// we have to give back the slot to the owning instance
if (markCancelled()) {
getInstance().returnAllocatedSlot(this);
}
} else {
// we have to ask our parent to dispose us
getParent().releaseChild(this);
} }

 

Execution.fail

public void fail(Throwable t) {
processFail(t, false);
}

 

Execution.processFail

先将Execution的状态设为failed

transitionState(current, FAILED, t)
private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) { 

    if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
markTimestamp(targetState); try {
vertex.notifyStateTransition(attemptId, targetState, error);
}
catch (Throwable t) {
LOG.error("Error while notifying execution graph of execution state transition.", t);
}
return true;
} else {
return false;
}
}

设置完后,需要notifyStateTransition

getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState
newExecutionState, Throwable error)
{
ExecutionJobVertex vertex = getJobVertex(vertexId); if (executionListenerActors.size() > 0) {
String message = error == null ? null : ExceptionUtils.stringifyException(error);
ExecutionGraphMessages.ExecutionStateChanged actorMessage =
new ExecutionGraphMessages.ExecutionStateChanged(jobID, vertexId, vertex.getJobVertex().getName(),
vertex.getParallelism(), subtask,
executionID, newExecutionState,
System.currentTimeMillis(), message); for (ActorGateway listener : executionListenerActors) {
listener.tell(actorMessage);
}
} // see what this means for us. currently, the first FAILED state means -> FAILED
if (newExecutionState == ExecutionState.FAILED) {
fail(error);
}
}

主要就是将ExecutionGraphMessages.ExecutionStateChanged,发送给所有的listeners

listener是在JobManager里面在提交job的时候加上的,

     if (jobInfo.listeningBehaviour == ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES) {
// the sender wants to be notified about state changes
val gateway = new AkkaActorGateway(jobInfo.client, leaderSessionID.orNull) executionGraph.registerExecutionListener(gateway)
executionGraph.registerJobStatusListener(gateway)
}

而在client,

JobClientActor,只是log和print这些信息
if (message instanceof ExecutionGraphMessages.ExecutionStateChanged) {
logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) message);
} else if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) message);
}

 

注意,这里如果newExecutionState == ExecutionState.FAILED,会调用ExecutionGraph.fail

就像注释说的,第一个failed,就意味着整个jobfailed

public void fail(Throwable t) {
while (true) {
JobStatus current = state;
// stay in these states
if (current == JobStatus.FAILING ||
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
} else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
synchronized (progressLock) {
postRunCleanup();
progressLock.notifyAll();
return;
}
} else if (transitionState(current, JobStatus.FAILING, t)) { //将job的状态设为JobStatus.FAILING
this.failureCause = t; if (!verticesInCreationOrder.isEmpty()) {
// cancel all. what is failed will not cancel but stay failed
for (ExecutionJobVertex ejv : verticesInCreationOrder) {
ejv.cancel();
}
} else {
// set the state of the job to failed
transitionState(JobStatus.FAILING, JobStatus.FAILED, t); //
} return;
} }
}

可以看到,这里直接把job状态设为Failing,并且调用所有的ExecutionJobVertex.cancel

 

接着,从ExecutionGraph中deregister这个execution,

vertex.getExecutionGraph().deregisterExecution(this);
Execution contained = currentExecutions.remove(exec.getAttemptId());

 

最终,调用

vertex.executionFailed(t);
void executionFailed(Throwable t) {
jobVertex.vertexFailed(subTaskIndex, t);
}

 

ExecutionJobVertex

void vertexFailed(int subtask, Throwable error) {
subtaskInFinalState(subtask);
} private void subtaskInFinalState(int subtask) {
synchronized (stateMonitor) {
if (!finishedSubtasks[subtask]) {
finishedSubtasks[subtask] = true; if (numSubtasksInFinalState+1 == parallelism) { //看看对于Vertex而言,是否所有的subTask都已经finished // call finalizeOnMaster hook
try {
getJobVertex().finalizeOnMaster(getGraph().getUserClassLoader());
}
catch (Throwable t) {
getGraph().fail(t);
} numSubtasksInFinalState++; // we are in our final state
stateMonitor.notifyAll(); // tell the graph
graph.jobVertexInFinalState();
} else {
numSubtasksInFinalState++;
}
}
}
}

graph.jobVertexInFinalState()

void jobVertexInFinalState() {
numFinishedJobVertices++; if (numFinishedJobVertices == verticesInCreationOrder.size()) { //是否所有JobVertices都已经finished // we are done, transition to the final state
JobStatus current;
while (true) {
current = this.state; if (current == JobStatus.RUNNING) {
if (transitionState(current, JobStatus.FINISHED)) {
postRunCleanup();
break;
}
}
else if (current == JobStatus.CANCELLING) {
if (transitionState(current, JobStatus.CANCELED)) {
postRunCleanup();
break;
}
}
else if (current == JobStatus.FAILING) {
boolean allowRestart = !(failureCause instanceof SuppressRestartsException); if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
restartStrategy.restart(this);
break;
} else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
postRunCleanup();
break;
}
}
else if (current == JobStatus.SUSPENDED) {
// we've already cleaned up when entering the SUSPENDED state
break;
}
else if (current.isGloballyTerminalState()) {
LOG.warn("Job has entered globally terminal state without waiting for all " +
"job vertices to reach final state.");
break;
}
else {
fail(new Exception("ExecutionGraph went into final state from state " + current));
break;
}
}
// done transitioning the state // also, notify waiters
progressLock.notifyAll();
}
}
}

如果Job状态是JobStatus.FAILING,并且满足restart的条件,transitionState(current, JobStatus.RESTARTING)

restartStrategy.restart(this);

这个restart策略是可以配置的,但无论什么策略最终调用到,

executionGraph.restart();
public void restart() {
try {
synchronized (progressLock) {
JobStatus current = state; if (current == JobStatus.CANCELED) {
LOG.info("Canceled job during restart. Aborting restart.");
return;
} else if (current == JobStatus.FAILED) {
LOG.info("Failed job during restart. Aborting restart.");
return;
} else if (current == JobStatus.SUSPENDED) {
LOG.info("Suspended job during restart. Aborting restart.");
return;
} else if (current != JobStatus.RESTARTING) {
throw new IllegalStateException("Can only restart job from state restarting.");
} if (scheduler == null) {
throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
} this.currentExecutions.clear(); Collection<CoLocationGroup> colGroups = new HashSet<>(); for (ExecutionJobVertex jv : this.verticesInCreationOrder) { CoLocationGroup cgroup = jv.getCoLocationGroup();
if(cgroup != null && !colGroups.contains(cgroup)){
cgroup.resetConstraints();
colGroups.add(cgroup);
} jv.resetForNewExecution();
} for (int i = 0; i < stateTimestamps.length; i++) {
if (i != JobStatus.RESTARTING.ordinal()) {
// Only clear the non restarting state in order to preserve when the job was
// restarted. This is needed for the restarting time gauge
stateTimestamps[i] = 0;
}
}
numFinishedJobVertices = 0;
transitionState(JobStatus.RESTARTING, JobStatus.CREATED); // if we have checkpointed state, reload it into the executions
if (checkpointCoordinator != null) {
boolean restored = checkpointCoordinator
.restoreLatestCheckpointedState(getAllVertices(), false, false); //重新加载checkpoint和状态 // TODO(uce) Temporary work around to restore initial state on
// failure during recovery. Will be superseded by FLINK-3397.
if (!restored && savepointCoordinator != null) {
String savepointPath = savepointCoordinator.getSavepointRestorePath();
if (savepointPath != null) {
savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath);
}
}
}
} scheduleForExecution(scheduler); //把ExecuteGraph加入调度,重新提交
}
catch (Throwable t) {
fail(t);
}
}

Flink -- Failover的更多相关文章

  1. Building real-time dashboard applications with Apache Flink&comma; Elasticsearch&comma; and Kibana

    https://www.elastic.co/cn/blog/building-real-time-dashboard-applications-with-apache-flink-elasticse ...

  2. 终于等到你!阿里正式向 Apache Flink 贡献 Blink 源码

    摘要: 如同我们去年12月在 Flink Forward China 峰会所约,阿里巴巴内部 Flink 版本 Blink 将于 2019 年 1 月底正式开源.今天,我们终于等到了这一刻. 阿里妹导 ...

  3. Flink架构分析之Standalone模式启动流程

    概述 FLIP6 对Flink架构进行了改进,引入了Dispatcher组件集成了所有任务共享的一些组件:SubmittedJobGraphStore,LibraryCacheManager等,为了保 ...

  4. flink和spark stream等框架的对比

    参考这篇文章: https://www.sohu.com/a/196257023_470008 我们当时的目标就是要设计一款低延迟.exactly once.流和批统一的,能够支撑足够大体量的复杂计算 ...

  5. 聊聊flink的Async I&sol;O

    // This example implements the asynchronous request and callback with Futures that have the // inter ...

  6. Flink 靠什么征服饿了么工程师?

    Flink 靠什么征服饿了么工程师? 2018-08-13    易伟平 阿里妹导读:本文将为大家展示饿了么大数据平台在实时计算方面所做的工作,以及计算引擎的演变之路,你可以借此了解Storm.Spa ...

  7. Flink学习笔记&colon;Flink开发环境搭建

    本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz ...

  8. flink学习笔记-各种Time

    说明:本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKh ...

  9. Apache Flink 1&period;5&period;0 Release Announcement

    Apache Flink: Apache Flink 1.5.0 Release Announcement https://flink.apache.org/news/2018/05/25/relea ...

随机推荐

  1. linux登陆欢迎提示信息的设置

    Linux可以设置登录前后的欢迎信息,虽然没啥技术含量,但却是非常实用的一个小技巧. 实现登录消息的功能,可以修改3个文件. 1./etc/issue 本地登陆显示的信息,本地登录前 2./etc/i ...

  2. Mysql表的七种引擎类型&comma;InnoDB和MyISAM引擎对比区别总结

    InnoDB和MyISAM区别总结 我用MySQL的时候用的是Navicat for MySQL(Navicat for mysql v9.0.15注册码生成器)操作库.表操作的,默认的表就是Inno ...

  3. Java中的String,StringBuffer,StringBuilder详解与区别

    1.String Java中string类是不可变的,其中在声明的源代码中用的final,所以只能声明一次.所以每次在明面上的改变其实是重新生成一个String对象,指针指向新的String对象.同时 ...

  4. MSSQL数据导出到MYSQL

    MSSQL数据导出到MYSQL 花了一天时间把MSSQL里的数据导出到MYSQL, 好麻烦,二个数据库都是阿里云买的云服务器. 先上阿里云控制面板,备份下MSSQL数据库,下载备份下来,在本地电脑上还 ...

  5. 说 AppbarLayout&comma;如何理解可折叠 Toolbar 的定制

    Material Design 是个好东西,它的出现使得 Android 也能定制高颜值的界面,并且指导了如果实现复杂炫丽的交互效果,而 android Surpport Desgin 这个支持包就是 ...

  6. JDK1&period;8降到1&period;7技巧

    前言: 最近部署一个产品,该产品不支持JDK1.8,碰巧我的机器安装的是1.8,这就需要降到1.7才能部署启动成功.那么我也是不赞成卸载1.8来安装1.7,因为很多时候可能需要1.8和1.7来回切换. ...

  7. visual studio 2017 编译v140 TRACKER &colon; error TRK0005&colon; Failed to locate&colon; &quot&semi;CL&period;exe&quot&semi;&period; 系统找不到指定的文件

    原因可能是vs2017中配置v140 的编译命令路径有问题 解决方案: 用vs2017的在线安装程序,选择修改 进去后选择单个组建,在编译器.生成工具和运行时里面把vc++2015.3…… 打钩的取消 ...

  8. webapi help文档 添加测试功能

    在做webapi项目的时候 webapi为我们提供了help文档,开发者可以参考这个文档,但是这个文档缺少测试功能,如果加上一个测试的功能就更加方便了 于是就研究了下写了一段代码,代码比较简单,只要将 ...

  9. protocol buffers的使用示例

    protocol buffers的使用示例 如果不了解protocol buffers,可以先参看:http://blog.csdn.net/zhu_xun/article/details/19343 ...

  10. 把sublime3打造成c&plus;&plus;开发环境

    安装sublime 3 sudo add-apt-repository ppa:webupd8team/sublime-text-3 sudo apt-get update sudo apt-get ...