Flink - Checkpoint

时间:2022-11-13 16:35:51

Flink在流上最大的特点,就是引入全局snapshot,

 

CheckpointCoordinator

做snapshot的核心组件为, CheckpointCoordinator

/**
* The checkpoint coordinator coordinates the distributed snapshots of operators and state.
* It triggers the checkpoint by sending the messages to the relevant tasks and collects the
* checkpoint acknowledgements. It also collects and maintains the overview of the state handles
* reported by the tasks that acknowledge the checkpoint.
*
* <p>Depending on the configured {@link RecoveryMode}, the behaviour of the {@link
* CompletedCheckpointStore} and {@link CheckpointIDCounter} change. The default standalone
* implementations don't support any recovery.
*/
public class CheckpointCoordinator { /** Tasks who need to be sent a message when a checkpoint is started */
private final ExecutionVertex[] tasksToTrigger; //需要触发checkpoint的tasks /** Tasks who need to acknowledge a checkpoint before it succeeds */
private final ExecutionVertex[] tasksToWaitFor; /** Tasks who need to be sent a message when a checkpoint is confirmed */
private final ExecutionVertex[] tasksToCommitTo; /** Map from checkpoint ID to the pending checkpoint */
private final Map<Long, PendingCheckpoint> pendingCheckpoints; /** Completed checkpoints. Implementations can be blocking. Make sure calls to methods
* accessing this don't block the job manager actor and run asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore; //用于记录已经完成的checkpoints /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */
private final ArrayDeque<Long> recentPendingCheckpoints; /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
* need to be ascending across job managers. */
protected final CheckpointIDCounter checkpointIdCounter; //保证产生递增的checkpoint id,即使当jobmanager crash,也有保证全局checkpoint id是递增的 /** The base checkpoint interval. Actual trigger time may be affected by the
* max concurrent checkpoints and minimum-pause values */
private final long baseInterval; //触发checkpoint的时间间隔 /** The max time (in ms) that a checkpoint may take */
private final long checkpointTimeout; //一次checkpoint消耗的最大时间,超过,我们就可以认为该checkpoint超时失败 /** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
* enforce minimum processing time between checkpoint attempts */
private final long minPauseBetweenCheckpoints; //checkpoint之间的最小间隔 /** The maximum number of checkpoints that may be in progress at the same time */
private final int maxConcurrentCheckpointAttempts; //最多同时存在多少checkpoint /** Actor that receives status updates from the execution graph this coordinator works for */
private ActorGateway jobStatusListener; /** The number of consecutive failed trigger attempts */
private int numUnsuccessfulCheckpointsTriggers; private ScheduledTrigger currentPeriodicTrigger; /** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
* Non-volatile, because only accessed in synchronized scope */
private boolean periodicScheduling; /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
* accessed in synchronized scope */
private boolean triggerRequestQueued; /** Flag marking the coordinator as shut down (not accepting any messages any more) */
private volatile boolean shutdown; //注意是volatile,保证可见性 /** Shutdown hook thread to clean up state handles. */
private final Thread shutdownHook; /** Helper for tracking checkpoint statistics */
private final CheckpointStatsTracker statsTracker; public CheckpointCoordinator(
JobID job,
long baseInterval,
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpointAttempts,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
ClassLoader userClassLoader,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
RecoveryMode recoveryMode,
CheckpointStatsTracker statsTracker) throws Exception { checkpointIDCounter.start(); //开启CheckpointIDCounter this.timer = new Timer("Checkpoint Timer", true); this.statsTracker = checkNotNull(statsTracker); if (recoveryMode == RecoveryMode.STANDALONE) { // 如果是standalone模式,需要加上shutdownHook来清理state
// Add shutdown hook to clean up state handles when no checkpoint recovery is
// possible. In case of another configured recovery mode, the checkpoints need to be
// available for the standby job managers.
this.shutdownHook = new Thread(new Runnable() {
@Override
public void run() {
try {
CheckpointCoordinator.this.shutdown(); //显示的调用shutdown
}
catch (Throwable t) {
LOG.error("Error during shutdown of checkpoint coordinator via " +
"JVM shutdown hook: " + t.getMessage(), t);
}
}
}); try {
// Add JVM shutdown hook to call shutdown of service
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
catch (IllegalStateException ignored) {
// JVM is already shutting down. No need to do anything.
}
catch (Throwable t) {
LOG.error("Cannot register checkpoint coordinator shutdown hook.", t);
}
}
else {
this.shutdownHook = null;
}
}

 

CheckpointIDCounter

有两种,

StandaloneCheckpointIDCounter

这种case下的,counter,只是用AtomicLong来是实现的,那JobManager如果挂了,那这个值可能是丢了的,重启后,应该是无法保证递增的

但这里说,在standalone的情况下,不需要做recovery,所以这个是可以接受的

/**
* {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#STANDALONE}.
*
* <p>Simple wrapper of an {@link AtomicLong}. This is sufficient, because job managers are not
* recoverable in this recovery mode.
*/
public class StandaloneCheckpointIDCounter implements CheckpointIDCounter { private final AtomicLong checkpointIdCounter = new AtomicLong(1); @Override
public void start() throws Exception {
} @Override
public void stop() throws Exception {
} @Override
public long getAndIncrement() throws Exception {
return checkpointIdCounter.getAndIncrement();
} @Override
public void setCount(long newCount) {
checkpointIdCounter.set(newCount);
}
}

 

ZooKeeperCheckpointIDCounter

这种counter用zk的persistent node来保存当前的计数,以保证计数的递增

/**
* {@link CheckpointIDCounter} instances for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
*
* <p>Each counter creates a ZNode:
* <pre>
* +----O /flink/checkpoint-counter/&lt;job-id&gt; 1 [persistent]
* .
* .
* .
* +----O /flink/checkpoint-counter/&lt;job-id&gt; N [persistent]
* </pre>
*
* <p>The checkpoints IDs are required to be ascending (per job). In order to guarantee this in case
* of job manager failures we use ZooKeeper to have a shared counter across job manager instances.
*/
public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter

 

CompletedCheckpointStore

接口,用于记录有哪些已经完成的checkpoint

/**
* A bounded LIFO-queue of {@link CompletedCheckpoint} instances.
*/
public interface CompletedCheckpointStore { /**
* Recover available {@link CompletedCheckpoint} instances.
*
* <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
* available checkpoint.
*/
void recover() throws Exception; /**
* Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
*
* <p>Only a bounded number of checkpoints is kept. When exceeding the maximum number of
* retained checkpoints, the oldest one will be discarded via {@link
* CompletedCheckpoint#discard(ClassLoader)}.
*/
void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception; /**
* Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was
* added.
*/
CompletedCheckpoint getLatestCheckpoint() throws Exception; /**
* Discards all added {@link CompletedCheckpoint} instances via {@link
* CompletedCheckpoint#discard(ClassLoader)}.
*/
void discardAllCheckpoints() throws Exception; /**
* Returns all {@link CompletedCheckpoint} instances.
*
* <p>Returns an empty list if no checkpoint has been added yet.
*/
List<CompletedCheckpoint> getAllCheckpoints() throws Exception; /**
* Returns the current number of retained checkpoints.
*/
int getNumberOfRetainedCheckpoints(); }

 

看下StandaloneCompletedCheckpointStore,其实就是一个用于记录CompletedCheckpoint的ArrayDeque

class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {

    /** The completed checkpoints. */
private final ArrayDeque<CompletedCheckpoint> checkpoints;
}

ZooKeeperCompletedCheckpointStore,这个就是用zk来记录

/**
* {@link CompletedCheckpointStore} for JobManagers running in {@link RecoveryMode#ZOOKEEPER}.
*
* <p>Checkpoints are added under a ZNode per job:
* <pre>
* +----O /flink/checkpoints/&lt;job-id&gt; [persistent]
* . |
* . +----O /flink/checkpoints/&lt;job-id&gt;/1 [persistent]
* . . .
* . . .
* . . .
* . +----O /flink/checkpoints/&lt;job-id&gt;/N [persistent]
* </pre>
*
* <p>During recovery, the latest checkpoint is read from ZooKeeper. If there is more than one,
* only the latest one is used and older ones are discarded (even if the maximum number
* of retained checkpoints is greater than one).
*
* <p>If there is a network partition and multiple JobManagers run concurrent checkpoints for the
* same program, it is OK to take any valid successful checkpoint as long as the "history" of
* checkpoints is consistent. Currently, after recovery we start out with only a single
* checkpoint to circumvent those situations.
*/
public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {

 

 

做snapshot流程

StreamingJobGraphGenerator

配置checkpoint
private void configureCheckpointing() {
CheckpointConfig cfg = streamGraph.getCheckpointConfig(); //取出Checkpoint的配置 if (cfg.isCheckpointingEnabled()) {
long interval = cfg.getCheckpointInterval(); //Checkpoint的时间间隔 // collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources
List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>(); // collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size()); // collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<JobVertexID>(); for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) { //只有对source vertex,才加入triggerVertices,因为只需要在源头触发checkpoint
triggerVertices.add(vertex.getID());
}
// TODO: add check whether the user function implements the checkpointing interface
commitVertices.add(vertex.getID()); //当前所有节点都会加入commitVertices和ackVertices
ackVertices.add(vertex.getID());
} JobSnapshottingSettings settings = new JobSnapshottingSettings( //生成JobSnapshottingSettings
triggerVertices, ackVertices, commitVertices, interval,
cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints());
jobGraph.setSnapshotSettings(settings); //调用setSnapshotSettings // if the user enabled checkpointing, the default number of exec retries is infinitive.
int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
if(executionRetries == -1) {
streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
}
}
}

 

JobManager

submitJob的时候,将JobGraph中的配置,放到ExecutionGraph中去

private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {

    // configure the state checkpointing
val snapshotSettings = jobGraph.getSnapshotSettings
if (snapshotSettings != null) {
val jobId = jobGraph.getJobID() val idToVertex: JobVertexID => ExecutionJobVertex = id => {
val vertex = executionGraph.getJobVertex(id)
if (vertex == null) {
throw new JobSubmissionException(jobId,
"The snapshot checkpointing settings refer to non-existent vertex " + id)
}
vertex
} val triggerVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToTrigger().asScala.map(idToVertex).asJava val ackVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToAcknowledge().asScala.map(idToVertex).asJava val confirmVertices: java.util.List[ExecutionJobVertex] =
snapshotSettings.getVerticesToConfirm().asScala.map(idToVertex).asJava val completedCheckpoints = checkpointRecoveryFactory
.createCompletedCheckpoints(jobId, userCodeLoader) val checkpointIdCounter = checkpointRecoveryFactory.createCheckpointIDCounter(jobId) executionGraph.enableSnapshotCheckpointing(
snapshotSettings.getCheckpointInterval,
snapshotSettings.getCheckpointTimeout,
snapshotSettings.getMinPauseBetweenCheckpoints,
snapshotSettings.getMaxConcurrentCheckpoints,
triggerVertices,
ackVertices,
confirmVertices,
context.system,
leaderSessionID.orNull,
checkpointIdCounter,
completedCheckpoints,
recoveryMode,
savepointStore)
}
}

 

ExecutionGraph

创建checkpointCoordinator对象

public void enableSnapshotCheckpointing(
long interval,
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
ActorSystem actorSystem,
UUID leaderSessionID,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
RecoveryMode recoveryMode,
StateStore<Savepoint> savepointStore) throws Exception { ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger);
ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor);
ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); // disable to make sure existing checkpoint coordinators are cleared
disableSnaphotCheckpointing(); if (isStatsDisabled) {
checkpointStatsTracker = new DisabledCheckpointStatsTracker();
}
else {
int historySize = jobConfiguration.getInteger(
ConfigConstants.JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE); checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, tasksToWaitFor);
} // create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator = new CheckpointCoordinator(
jobID,
interval,
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
userClassLoader,
checkpointIDCounter,
completedCheckpointStore,
recoveryMode,
checkpointStatsTracker); // the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
registerJobStatusListener( //将checkpointCoordinator的actor注册到jobStatusListenerActors,这样当job状态变化时,可以通知checkpointCoordinator
checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));

这里看到checkpointCoordinator 作为ExecutionGraph的成员,

接着会异步的提交ExecutionGraph,

// execute the recovery/writing the jobGraph into the SubmittedJobGraphStore asynchronously
// because it is a blocking operation
future {
try {
if (isRecovery) {
executionGraph.restoreLatestCheckpointedState() //恢复CheckpointedState
}
else {
//......
}
submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo)) //把jobGraph放到submittedJobGraphs中track
} jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID)) //告诉client,job提交成功 if (leaderElectionService.hasLeadership) {
executionGraph.scheduleForExecution(scheduler) //真正的调度executionGraph
} else {
//......
}
} catch {
//.......
}
}(context.dispatcher)

 

CheckpointCoordinatorDeActivator

/**
* This actor listens to changes in the JobStatus and activates or deactivates the periodic
* checkpoint scheduler.
*/
public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor { private final CheckpointCoordinator coordinator;
private final UUID leaderSessionID; @Override
public void handleMessage(Object message) {
if (message instanceof ExecutionGraphMessages.JobStatusChanged) {
JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus(); if (status == JobStatus.RUNNING) {
// start the checkpoint scheduler
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();
}
} // we ignore all other messages
} @Override
public UUID getLeaderSessionID() {
return leaderSessionID;
}
}

在job状态发生变化时,需要打开或关闭Checkpoint scheduler

 

CheckpointCoordinator

开启定时startCheckpointScheduler

public void startCheckpointScheduler() {
synchronized (lock) {
// make sure all prior timers are cancelled
stopCheckpointScheduler(); periodicScheduling = true;
currentPeriodicTrigger = new ScheduledTrigger();
timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
}
} private class ScheduledTrigger extends TimerTask { @Override
public void run() {
try {
triggerCheckpoint(System.currentTimeMillis());
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint", e);
}
}
}

 

triggerCheckpoint,用于触发一次checkpoint

/**
* Triggers a new checkpoint and uses the given timestamp as the checkpoint
* timestamp.
*
* @param timestamp The timestamp for the checkpoint.
* @param nextCheckpointId The checkpoint ID to use for this checkpoint or <code>-1</code> if
* the checkpoint ID counter should be queried.
*/
public boolean triggerCheckpoint(long timestamp, long nextCheckpointId) throws Exception { // we will actually trigger this checkpoint!
final long checkpointID;
if (nextCheckpointId < 0) {
try {
// this must happen outside the locked scope, because it communicates
// with external services (in HA mode) and may block for a while.
checkpointID = checkpointIdCounter.getAndIncrement();
}
catch (Throwable t) { }
}
else {
checkpointID = nextCheckpointId;
} //对于没有开始的Checkpoint,称为PendingCheckpoint,传入所有需要ack checkpoint的ackTasks
//后续会一个个ack这些tasks,当所有的ackTasks都被acked,PendingCheckpoint就变成CompletedCheckpoint
final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks); // schedule the timer that will clean up the expired checkpoints,定期去清理过期的checkpoint
TimerTask canceller = new TimerTask() {
@Override
public void run() {
try {
synchronized (lock) {
// only do the work if the checkpoint is not discarded anyways
// note that checkpoint completion discards the pending checkpoint object
if (!checkpoint.isDiscarded()) {
LOG.info("Checkpoint " + checkpointID + " expired before completing."); checkpoint.discard(userClassLoader);
pendingCheckpoints.remove(checkpointID);
rememberRecentCheckpointId(checkpointID); onCancelCheckpoint(checkpointID); triggerQueuedRequests();
}
}
}
catch (Throwable t) {
LOG.error("Exception while handling checkpoint timeout", t);
}
}
}; try {
// re-acquire the lock
synchronized (lock) {
pendingCheckpoints.put(checkpointID, checkpoint); //将该PendingCheckpoint加入列表track
timer.schedule(canceller, checkpointTimeout); //并且启动canceller
}
// end of lock scope // send the messages to the tasks that trigger their checkpoint
for (int i = 0; i < tasksToTrigger.length; i++) {
ExecutionAttemptID id = triggerIDs[i];
TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
tasksToTrigger[i].sendMessageToCurrentExecution(message, id); //给所有的需要触发checkpoint的task发送checkpoint message,这里只是source tasks
} numUnsuccessfulCheckpointsTriggers = 0;
return true;
}
catch (Throwable t) { }
}

---------上面只会给所有的source发checkpoint message,所以下面的流程只有source会走到-----------

 

TaskManager

sendMessageToCurrentExecution,发送的message最终会被TaskManager收到,

/**
* Handler for messages related to checkpoints.
*
* @param actorMessage The checkpoint message.
*/
private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = { actorMessage match {
case message: TriggerCheckpoint => //如果是triggerCheckpoint
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp val task = runningTasks.get(taskExecutionId) //从runningTasks中取出真正执行的task
if (task != null) {
task.triggerCheckpointBarrier(checkpointId, timestamp) //最终是调用task的triggerCheckpointBarrier
} case message: NotifyCheckpointComplete =>
val taskExecutionId = message.getTaskExecutionId
val checkpointId = message.getCheckpointId
val timestamp = message.getTimestamp val task = runningTasks.get(taskExecutionId)
if (task != null) {
task.notifyCheckpointComplete(checkpointId) //调用task的notifyCheckpointComplete
} else {
log.debug(
s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")
} // unknown checkpoint message
case _ => unhandled(actorMessage)
}
}

 

Task

public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
AbstractInvokable invokable = this.invokable; if (executionState == ExecutionState.RUNNING && invokable != null) {
if (invokable instanceof StatefulTask) { // build a local closure
final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
final String taskName = taskNameWithSubtask; Runnable runnable = new Runnable() {
@Override
public void run() {
try {
statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp); //关键就是调用statefulTask的triggerCheckpoint,这个时候task正在执行,所以checkpoint是并行做的
}
catch (Throwable t) {
failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t));
}
}
};
executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName);
}
}
}

 

StreamTask

StreamTask就是实现了StatefulTask

所以最终调用到,

StreamTask.triggerCheckpoint,这里面会实际去做checkpoint工作
调用performCheckpoint(checkpointId, timestamp)
protected boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {

    synchronized (lock) { //加锁,checkpoint需要stop world
if (isRunning) { // Since both state checkpointing and downstream barrier emission occurs in this
// lock scope, they are an atomic operation regardless of the order in which they occur.
// Given this, we immediately emit the checkpoint barriers, so the downstream operators
// can start their checkpoint work as soon as possible
operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp); //立即发出barrier,理由如上注释 // now draw the state snapshot
final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
final StreamTaskState[] states = new StreamTaskState[allOperators.length]; boolean hasAsyncStates = false; for (int i = 0; i < states.length; i++) { //根据各个state的类型,判断是否需要异步
StreamOperator<?> operator = allOperators[i];
if (operator != null) {
StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
if (state.getOperatorState() instanceof AsynchronousStateHandle) {
hasAsyncStates = true;
}
if (state.getFunctionState() instanceof AsynchronousStateHandle) {
hasAsyncStates = true;
}
if (state.getKvStates() != null) {
for (KvStateSnapshot<?, ?, ?, ?, ?> kvSnapshot: state.getKvStates().values()) {
if (kvSnapshot instanceof AsynchronousKvStateSnapshot) {
hasAsyncStates = true;
}
}
} states[i] = state.isEmpty() ? null : state;
}
} for (int i = 0; i < states.length; i++) { //为所有的Operator生成snapshot的StreamTaskState
StreamOperator<?> operator = allOperators[i];
if (operator != null) {
StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp); //通过operator.snapshotOperatorState生成StreamTaskState
states[i] = state.isEmpty() ? null : state;
}
} StreamTaskStateList allStates = new StreamTaskStateList(states); //异步或同步的进行checkpoint
if (allStates.isEmpty()) {
getEnvironment().acknowledgeCheckpoint(checkpointId);
} else if (!hasAsyncStates) { //sync方式
this.lastCheckpointSize = allStates.getStateSize();
getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);
} else { //async方式
// start a Thread that does the asynchronous materialization and
// then sends the checkpoint acknowledge
String threadName = "Materialize checkpoint state " + checkpointId + " - " + getName();
AsyncCheckpointThread checkpointThread = new AsyncCheckpointThread(
threadName, this, cancelables, states, checkpointId); synchronized (cancelables) {
cancelables.add(checkpointThread);
}
checkpointThread.start();
}
return true;
} else {
return false;
}
}
}
这里是对于source而言的checkpoint的调用逻辑,对于中间节点或sink,是要根据barrier情况,通过onEvent来触发triggerCheckpoint的

 

StreamTask.triggerCheckpoint最关键的步骤是,会对task中每个operator完成state snapshot

最终生成StreamTaskStateList allStates,保存所有的state的list

最终同步或异步的调用

getEnvironment().acknowledgeCheckpoint(checkpointId, allStates);

把state snapshot发送到Jobmanager去,后面就看看JobManager怎么处理的

同步的方式比较简单,但是一般都是需要异步的做snapshot的,

看看异步的AsyncCheckpointThread

AsyncCheckpointThread
@Override
public void run() {
try {
for (StreamTaskState state : states) {
if (state != null) {
if (state.getFunctionState() instanceof AsynchronousStateHandle) {
AsynchronousStateHandle<Serializable> asyncState = (AsynchronousStateHandle<Serializable>) state.getFunctionState();
state.setFunctionState(asyncState.materialize());
}
if (state.getOperatorState() instanceof AsynchronousStateHandle) {
AsynchronousStateHandle<?> asyncState = (AsynchronousStateHandle<?>) state.getOperatorState();
state.setOperatorState(asyncState.materialize());
}
if (state.getKvStates() != null) {
Set<String> keys = state.getKvStates().keySet();
HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates();
for (String key: keys) {
if (kvStates.get(key) instanceof AsynchronousKvStateSnapshot) {
AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) kvStates.get(key);
kvStates.put(key, asyncHandle.materialize()); //可以看到把真正的存储,delay到这里的materialize去做
}
}
} }
}
StreamTaskStateList allStates = new StreamTaskStateList(states);
owner.lastCheckpointSize = allStates.getStateSize();
owner.getEnvironment().acknowledgeCheckpoint(checkpointId, allStates); LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
}

 

RuntimeEnvironment

package org.apache.flink.runtime.taskmanager;
/**
* In implementation of the {@link Environment}.
*/
public class RuntimeEnvironment implements Environment {
@Override
public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
// try and create a serialized version of the state handle
SerializedValue<StateHandle<?>> serializedState;
long stateSize; if (state == null) {
serializedState = null;
stateSize = 0;
} else {
try {
serializedState = new SerializedValue<StateHandle<?>>(state);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize state handle during checkpoint confirmation", e);
} try {
stateSize = state.getStateSize();
}
catch (Exception e) {
throw new RuntimeException("Failed to fetch state handle size", e);
}
} AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
jobId,
executionId,
checkpointId,
serializedState,
stateSize); jobManager.tell(message);
}
}

所以可以看到,是把这个ack发送到job manager的,

 

JobManager

handleCheckpointMessage

/**
* Dedicated handler for checkpoint messages.
*
* @param actorMessage The checkpoint actor message.
*/
private def handleCheckpointMessage(actorMessage: AbstractCheckpointMessage): Unit = {
actorMessage match {
case ackMessage: AcknowledgeCheckpoint =>
val jid = ackMessage.getJob()
currentJobs.get(jid) match {
case Some((graph, _)) =>
val checkpointCoordinator = graph.getCheckpointCoordinator()
val savepointCoordinator = graph.getSavepointCoordinator() if (checkpointCoordinator != null && savepointCoordinator != null) {
future { //future等待异步的ack消息
try {
if (checkpointCoordinator.receiveAcknowledgeMessage(ackMessage)) { //JobManager收到checkpoint的ack message
// OK, this is the common case
}
else {
// Try the savepoint coordinator if the message was not addressed
// to the periodic checkpoint coordinator.
if (!savepointCoordinator.receiveAcknowledgeMessage(ackMessage)) {
log.info("Received message for non-existing checkpoint " +
ackMessage.getCheckpointId)
}
}
}
catch {
case t: Throwable =>
log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t)
}
}(context.dispatcher)
}

 

CheckpointCoordinator

receiveAcknowledgeMessage

/**
* Receives an AcknowledgeCheckpoint message and returns whether the
* message was associated with a pending checkpoint.
*
* @param message Checkpoint ack from the task manager
*
* @return Flag indicating whether the ack'd checkpoint was associated
* with a pending checkpoint.
*
* @throws Exception If the checkpoint cannot be added to the completed checkpoint store.
*/
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws Exception { final long checkpointId = message.getCheckpointId(); CompletedCheckpoint completed = null;
PendingCheckpoint checkpoint; // Flag indicating whether the ack message was for a known pending
// checkpoint.
boolean isPendingCheckpoint; synchronized (lock) { checkpoint = pendingCheckpoints.get(checkpointId); //取出相应的pendingCheckpoint if (checkpoint != null && !checkpoint.isDiscarded()) {
isPendingCheckpoint = true; if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) { //根据这个ack message,对pendingCheckpoint进行ack
if (checkpoint.isFullyAcknowledged()) { //如果所有需要ack的tasks都完成ack
completed = checkpoint.toCompletedCheckpoint(); //将状态置为Completed completedCheckpointStore.addCheckpoint(completed); //将checkpoint track到completedCheckpointStore,表示完成一次完整的checkpoint pendingCheckpoints.remove(checkpointId); //从pending里面去除相应的checkpoint
rememberRecentCheckpointId(checkpointId); dropSubsumedCheckpoints(completed.getTimestamp()); onFullyAcknowledgedCheckpoint(completed); triggerQueuedRequests();
}
} }
} // send the confirmation messages to the necessary targets. we do this here
// to be outside the lock scope
if (completed != null) {
final long timestamp = completed.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ExecutionAttemptID attemptId = ee.getAttemptId();
NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp);
ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId()); //通知每个ExecutionVertex,checkpoint完成
}
} statsTracker.onCompletedCheckpoint(completed);
} return isPendingCheckpoint;
}

 

PendingCheckpoint

在acknowledgeTask中,

只是把state,cache在collectedStates中,

public boolean acknowledgeTask(
ExecutionAttemptID attemptID,
SerializedValue<StateHandle<?>> state,
long stateSize) { synchronized (lock) {
if (discarded) {
return false;
} ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID);
if (vertex != null) {
if (state != null) {
collectedStates.add(new StateForTask(
state,
stateSize,
vertex.getJobvertexId(),
vertex.getParallelSubtaskIndex(),
System.currentTimeMillis() - checkpointTimestamp));
}
numAcknowledgedTasks++;
return true;
}
else {
return false;
}
}
}

 

接着在收到所有的task的ack后,会调用toCompletedCheckpoint

public CompletedCheckpoint toCompletedCheckpoint() {
synchronized (lock) {
if (discarded) {
throw new IllegalStateException("pending checkpoint is discarded");
}
if (notYetAcknowledgedTasks.isEmpty()) {
CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId,
checkpointTimestamp, System.currentTimeMillis(), new ArrayList<StateForTask>(collectedStates));
dispose(null, false); return completed;
}
else {
throw new IllegalStateException("Cannot complete checkpoint while not all tasks are acknowledged");
}
}
}

把collectedStates封装在CompletedCheckpoint中,返回

 

最后调用completedCheckpointStore.addCheckpoint,存储这个checkpoint,可以参考

ZooKeeperCompletedCheckpointStore

 

NotifyCheckpointComplete

通用这个NotifyCheckpointComplete,也最到TaskManager,Task,最终调到StreamTask.notifyCheckpointComplete

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
synchronized (lock) {
if (isRunning) {
LOG.debug("Notification of complete checkpoint for task {}", getName()); // We first notify the state backend if necessary
if (stateBackend instanceof CheckpointNotifier) {
((CheckpointNotifier) stateBackend).notifyCheckpointComplete(checkpointId);
} for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.notifyOfCompletedCheckpoint(checkpointId);
}
}
}
else {
LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
}
}
}

这个就是checkpoint的完整的过程

 

再看看restore的过程

 

Restore过程

可以看到,在提交job的时候,会调用

executionGraph.restoreLatestCheckpointedState()

/**
* Restores the latest checkpointed state.
*
* <p>The recovery of checkpoints might block. Make sure that calls to this method don't
* block the job manager actor and run asynchronously.
*
*/
public void restoreLatestCheckpointedState() throws Exception {
synchronized (progressLock) {
if (checkpointCoordinator != null) {
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
}
}
}

 

restoreLatestCheckpointedState

public void restoreLatestCheckpointedState(
Map<JobVertexID, ExecutionJobVertex> tasks,
boolean errorIfNoCheckpoint,
boolean allOrNothingState) throws Exception { synchronized (lock) { // Recover the checkpoints
//对于ZooKeeperCompletedCheckpointStore,
//Gets the latest checkpoint from ZooKeeper and removes all others.
completedCheckpointStore.recover();
// restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); //从completedCheckpointStore中取出最新的CompletedCheckpoint long recoveryTimestamp = System.currentTimeMillis(); if (allOrNothingState) { //全部成功或Nothing
Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>(); for (StateForTask state : latest.getStates()) {
ExecutionJobVertex vertex = tasks.get(state.getOperatorId());
Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
exec.setInitialState(state.getState(), recoveryTimestamp); //恢复state Integer count = stateCounts.get(vertex); //计数
if (count != null) {
stateCounts.put(vertex, count+1);
} else {
stateCounts.put(vertex, 1);
}
} // validate that either all task vertices have state, or none
for (Map.Entry<ExecutionJobVertex, Integer> entry : stateCounts.entrySet()) {
ExecutionJobVertex vertex = entry.getKey();
if (entry.getValue() != vertex.getParallelism()) { //如果vetex的恢复state次数不等于平行数,说明有些没有被恢复,抛异常
throw new IllegalStateException(
"The checkpoint contained state only for a subset of tasks for vertex " + vertex);
}
}
}
else {
for (StateForTask state : latest.getStates()) {
ExecutionJobVertex vertex = tasks.get(state.getOperatorId());
Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
exec.setInitialState(state.getState(), recoveryTimestamp);
}
}
}
}

 

Execution

public void setInitialState(SerializedValue<StateHandle<?>> initialState, long recoveryTimestamp) {
if (state != ExecutionState.CREATED) {
throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
}
this.operatorState = initialState;
this.recoveryTimestamp = recoveryTimestamp;
}

可以看到这里的recovery,只是把我们从zk中获取的checkpoint中的状态赋值给operatorState

然后再deployToSlot,会把初始state,封装到deployment中去,提交给taskManager

public void deployToSlot(final SimpleSlot slot) throws JobException {
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState, recoveryTimestamp, attemptNumber);
final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
}

 

在TaskManager中的submitTask里面,会创建Task,并执行该task,

 

Task.run()

// the very last thing before the actual execution starts running is to inject
// the state into the task. the state is non-empty if this is an execution
// of a task that failed but had backuped state from a checkpoint // get our private reference onto the stack (be safe against concurrent changes)
SerializedValue<StateHandle<?>> operatorState = this.operatorState; //恢复的state
long recoveryTs = this.recoveryTs; if (operatorState != null) {
if (invokable instanceof StatefulTask) { //如果是一个有状态的task
try {
StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader); //反序列化数据
StatefulTask<?> op = (StatefulTask<?>) invokable;
StateUtils.setOperatorState(op, state, recoveryTs);//真正的恢复state
}
catch (Exception e) {
throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);
}
}
else {
throw new IllegalStateException("Found operator state for a non-stateful task invokable");
}
} // be memory and GC friendly - since the code stays in invoke() for a potentially long time,
// we clear the reference to the state handle
//noinspection UnusedAssignment
operatorState = null;
this.operatorState = null;
 

StateUtils

public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op,
StateHandle<?> state, long recoveryTimestamp) throws Exception {
@SuppressWarnings("unchecked")
StatefulTask<T> typedOp = (StatefulTask<T>) op;
@SuppressWarnings("unchecked")
T typedHandle = (T) state; typedOp.setInitialState(typedHandle, recoveryTimestamp);
}

 

StreamTask

@Override
public void setInitialState(StreamTaskStateList initialState, long recoveryTimestamp) {
lazyRestoreState = initialState; //将状态置到lazyRestoreState
this.recoveryTimestamp = recoveryTimestamp;
}
//在StreamTask的invoke中,会调用restoreStateLazy,真正的做状态恢复
public void restoreStateLazy() throws Exception {
if (lazyRestoreState != null) { try {
final StreamOperator<?>[] allOperators = operatorChain.getAllOperators();
final StreamTaskState[] states = lazyRestoreState.getState(userClassLoader); //获取所有states // be GC friendly
lazyRestoreState = null; for (int i = 0; i < states.length; i++) {
StreamTaskState state = states[i];
StreamOperator<?> operator = allOperators[i]; if (state != null && operator != null) {
operator.restoreState(state, recoveryTimestamp); //最终把state恢复到operator
}
else if (operator != null) { }
}
}
catch (Exception e) {
throw new Exception("Could not restore checkpointed state to operators and functions", e);
}
}
}