文章目录
- 1. 集群内部通讯方法概述
- 2. TaskManager向ResourceManager注册RPC服务
- 3. JobMaster向ResourceManager申请Slot计算资源
现在我们已经知道Flink中RPC通信框架的底层设计与实现,接下来通过具体的实例了解集群运行时中组件如何基于RPC通信框架构建相互之间的调用关系。
1. 集群内部通讯方法概述
通过RegisteredRpcConnection进行连接注册与通讯(维护心跳等)
当TaskExecutor启动后,会立即向ResourceManager中注册当前TaskManager的信息。同样,JobMaster组件启动后也立即会向ResourceManager注册JobMaster的信息。这些注册操作实际上就是在构建集群中各个组件之间的RPC连接,这里的注册连接在Flink中被称为RegisteredRpcConnection,集群组件之间的RPC通信都会通过创建RegisteredRpcConnection进行,例如获取RpcEndpoint对应的RpcGateway接口以及维护组件之间的心跳连接等。
如下图,集群运行时中各组件的注册连接主要如下三种RegisteredRpcConnection的实现。
- JobManagerRegisteredRpcConnection:用于管理TaskManager中与JobManager之间的RPC连接。
- ResourceManagerConnection:用于管理JobManager中与ResourceManager之间的RPC连接。
- TaskExecutorToResourceManagerConnection:用于管理TaskExecutor中与ResourceManager之间的RPC连接。
如下图再有:
- RegisteredRpcConnection提供了generateRegistration()抽象方法,主要用于生成组件之间的RPC连接,每次调用RegisteredRpcConnection.start()方法启动RegisteredRpcConnection时,都会创建新的RetryingRegistration。
不同RegisteredRpcConnection创建的RetryingRegistration也会有所不同,例如在TaskExecutorToResourceManagerConnection中就会创建ResourceManagerRegistration实例。
- 调用rpcService.connect(targetAddress, targetType) ,返回RpcGateway的代理对象,通过RpcGateway连接到目标RpcEndpoint上。
- 在RetryingRegistration中会提供invokeRegistration()抽象方法,用于实现子类的RPC注册操作。
例如在ResourceManagerRegistration中会实现invokeRegistration()方法,在方法中调用resourceManager.registerTaskExecutor()将TaskExecutor信息注册到ResourceManager中,这里的ResourceManager就是ResourceManagerGateway接口代理类。
- 调用onRegistrationSuccess()方法继续后续操作,例如在JobManagerRegisteredRpcConnection中会向jobLeaderListener添加当前的jobId等信息。
- 如果当前组件没有成功到注册至目标组件时,会调用onRegistrationFailure()抽象方法继续后续操作,包括连接重连或停止整个RpcEndpoint对应的服务。
接着以TaskManager向ResourceManager注册RPC服务为例,介绍整个RPC连接的注册过程。
2. TaskManager向ResourceManager注册RPC服务
TaskManager向ResourceManager注册RPC服务的过程如图所示。
- TaskExecutor节点正常启动后,调用RpcEndpoint.onStart()方法初始化并启动TaskExecutor组件的内部服务。
- 创建监听服务
- TaskExecutor调用resourceManagerLeaderRetriever.start()方法,启动ResourceManager组件
领导节点的监听服务
并传入ResourceManagerLeaderListener,用于监听ResourceManager的领导节点的变化情况。- 当ResourceManagerLeaderListener接收到来自ResourceManager的leaderAddress以及leaderSessionID的信息后,调用notifyOfNewResourceManagerLeader()方法通知TaskExecutor和新的ResourceManagerLeader建立RPC连接。
- 创建与ResourceManager组件的RPC网络连接
a. 调用TaskExecutor.reconnectToResourceManager()内部方法,创建与ResourceManager组件之间的RPC网络连接。
b. 在reconnectToResourceManager()方法中会事先调用closeResourceManagerConnection()方法关闭之前的ResourceManager连接,然后依次调用tryConnectToResourceManager()和connectToResourceManager()方法创建与ResourceManager节点的RPC连接。
- 创建TaskExecutorRegistration对象
在connectToResourceManager()方法中会创建TaskExecutorRegistration对象,用于存储TaskManager的注册信息,其中包括taskExecutorAddress、resourceId以及dataPort等连接信息,同时还包含hardwareDescription、defaultSlotResourceProfile以及totalResourceProfile等资源描述信息。
- 正式建立网络连接
创建TaskExecutorToResourceManagerConnection实例,正式与ResourceManager建立RPC网络连接,同时调用TaskExecutorToResourceManagerConnection.start()方法启动RPC连接。实际上调用的是RegisteredRpcConnection.start()方法。
- 创建新的创建新的Registration与其他组件的RPC连接
在RegisteredRpcConnection中会调用内部方法createNewRegistration()创建新的Registration。而在createNewRegistration()方法中会调用generateRegistration()子类方法,创建与其他组件之间的RPC连接。这里主要调用的是TaskExecutorToResourceManagerConnection.generateRegistration()方法。
- 调用RetryingRegistration.startRegistration()方法注册具体的RPC连接,实际上会调用AkkaRpcService.connect()方法创建并获取ResourceManager对应的RpcGateway接口。
- 调用ResourceManagerGateway.registerTaskExecutor()方法,最终完成在ResourceManager中注册TaskManager的操作。创建的TaskExecutorRegistration同时会传递给ResourceManager。
- 当ResourceManager接收到TaskManager的注册信息后,会在本地维护TaskManager的注册信息,并建立与TaskManager组件之间的心跳连接,至此完成了TaskManager启动后向ResourceManager进行RPC网络连接注册的全部流程。
如代码所示
- TaskExecutor.connectToResourceManager()方法中首先会创建TaskExecutorRegistration注册信息和TaskExecutorToResourceManagerConnection对象。
- 然后调用TaskExecutorToResourceManagerConnection.start()方法启动TaskManager和ResourceManager之间的RPC注册连接。
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(establishedResourceManagerConnection == null);
assert(resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
// TaskExecutor注册信息
final TaskExecutorRegistration taskExecutorRegistration =
new TaskExecutorRegistration(
getAddress(),
getResourceID(),
taskManagerLocation.dataPort(),
hardwareDescription,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile()
);
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
resourceManagerConnection.start();
}
接着看RegisteredRpcConnection.start()的代码逻辑,如代码所示。
public void start() {
checkState(!closed, "The RPC connection is already closed");
checkState(!isConnected() && pendingRegistration == null,
"The RPC connection is already started");
// 创建RetryingRegistration
final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
// 启动RetryingRegistration
if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
newRegistration.startRegistration();
} else {
// 并行启动后,直接取消当前Registration
newRegistration.cancel();
}
}
关注:RetryingRegistration.startRegistration()逻辑。
- 根据不同的targetType,选择创建FencedRpcGateway还是普通的RpcGateway。
- 创建RpcGateway代理类后,就可以连接到指定的RpcEndpoint了。对于rpcService.connect()方法的定义,我们已经在RPC底层原理中介绍过。
- 创建RPC连接后,尝试注册操作。
- 如果注册失败,则进行Retry操作,除非接收到取消操作的消息。
public void startRegistration() {
if (canceled) {
return;
}
try {
final CompletableFuture<G> rpcGatewayFuture;
// 根据不同的targetType,选择创建FencedRpcGateway还是普通的RpcGateway
if (FencedRpcGateway.class.isAssignableFrom(targetType)) {
rpcGatewayFuture = (CompletableFuture<G>) rpcService.connect(
targetAddress,
fencingToken,
targetType.asSubclass(FencedRpcGateway.class));
} else {
rpcGatewayFuture = rpcService.connect(targetAddress, targetType);
}
// 成功获取网关后,尝试注册操作
CompletableFuture<Void> rpcGatewayAcceptFuture =
rpcGatewayFuture.thenAcceptAsync(
(G rpcGateway) -> {
log.info("Resolved {} address, beginning registration",
targetName);
register(rpcGateway, 1, retryingRegistrationConfiguration.
getInitialRegistrationTimeoutMillis());
},
rpcService.getExecutor());
// 如果注册失败,则进行Retry操作,除非取消操作
rpcGatewayAcceptFuture.whenCompleteAsync(
(Void v, Throwable failure) -> {
if (failure != null && !canceled) {
final Throwable strippedFailure =
ExceptionUtils.stripCompletionException(failure);
// 间隔指定时间后再次注册
startRegistrationLater(retryingRegistrationConfiguration.
getErrorDelayMillis());
}
},
rpcService.getExecutor());
}
catch (Throwable t) {
completionFuture.completeExceptionally(t);
cancel();
}
}
继续了解ResourceManagerRegistration.invokeRegistration()的具体实现。
该方法会创建
和ResourceManagerGateway之间的连接以及注册操作
,
resourceManager会接收来自TaskExecutor的注册信息,并根据taskExecutorRegistration提供的注册信息,将TaskExecutor信息记录在ResourceManager的本地存储中,然后开启TaskExecutor之间的心跳连接。至此,TaskManager能和ResourceManager进行正常的RPC通信了。
protected CompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGateway resourceManager, ResourceManagerId fencingToken,
long timeoutMillis) throws Exception {
Time timeout = Time.milliseconds(timeoutMillis);
return resourceManager.registerTaskExecutor(
taskExecutorRegistration,
timeout);
}
对于其他组件之间的RpcConnection注册操作,例如TaskManager与JobMaster之间的RPC连接注册,基本上和ResourceManagerRegistration一样,这里暂不介绍。
接下来我们看JobMaster是如何向ResourceManager申请Slot计算资源的。
3. JobMaster向ResourceManager申请Slot计算资源
当JobMaster组件启动后,
- 会(调用JobMaster.startJobMasterServices())启动JobMaster中的内部服务,其中就包括了SlotPool组件。
- 同时会创建和启动JobMaster与ResourceManager之间的RPC连接ResourceManagerConnection。创建成功后,会执行包括向ResourceManager发送申请Slot计算资源的RPC请求等后续操作。
如代码所示
//从SlotPoolImpl.connectToResourceManager()可以看出,方法中分别遍历
//waitingForResourceManager集合中的PendingRequest,
//然后就每个PendingRequest调用requestSlotFromResourceManager()方法向
//ResourceManager申请PendingRequest中指定的Slot计算资源。
public void connectToResourceManager(
@Nonnull ResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
for (PendingRequest pendingRequest : waitingForResourceManager.values()) {
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
waitingForResourceManager.clear();
}
继续看SlotPoolImpl.requestSlotFromResourceManager()方法的实现,如下代码所示。
- 创建AllocationID并将pendingRequest和AllocationID存储在pendingRequests集合中。
- 判断pendingRequest是否出现异常或已经分配了其他AllocationID,如果出现异常或已分配则取消当前pendingRequest中的资源分配请求。
- 调用resourceManagerGateway.requestSlot()远程RPC方法向ResourceManager申请Slot计算资源,此时会在方法中创建SlotRequest对象,指定申请Slot计算资源的具体参数。
- ResourceManager接收到SlotPool发送的SlotRequest请求后,会将SlotRequest转发给SlotManager进行处理,此时如果能正常分配到Slot资源,则会返回Acknowledge信息。
- 调用FutureUtils.whenCompleteAsyncIfNotDone()方法执行返回rmResponse CompletableFuture的对象,此时如果Slot资源申请过程出现异常,则调用slotRequestToResourceManager-Failed()方法进行处理。
private void requestSlotFromResourceManager(
final ResourceManagerGateway resourceManagerGateway,
final PendingRequest pendingRequest) {
checkNotNull(resourceManagerGateway);
checkNotNull(pendingRequest);
log.info("Requesting new slot [{}] and profile {} from resource manager.",
pendingRequest.getSlotRequestId(), pendingRequest.
getResourceProfile());
final AllocationID allocationId = new AllocationID();
pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId,
pendingRequest);
pendingRequest.getAllocatedSlotFuture().whenComplete(
(AllocatedSlot allocatedSlot, Throwable throwable) -> {
if (throwable != null
|| !allocationId.equals(allocatedSlot.getAllocationId())) {
resourceManagerGateway.cancelSlotRequest(allocationId);
}
});
CompletableFuture<Acknowledge> rmResponse =
resourceManagerGateway.requestSlot(
jobMasterId,
new SlotRequest(jobId, allocationId,
pendingRequest.getResourceProfile(), jobManagerAddress),
rpcTimeout);
FutureUtils.whenCompleteAsyncIfNotDone(
rmResponse,
componentMainThreadExecutor,
(Acknowledge ignored, Throwable failure) -> {
if (failure != null) {
slotRequestToResourceManagerFailed(pendingRequest.
getSlotRequestId(), failure);
}
});
}
从以上实例可以看出,集群运行时中各个组件之间都是基于RPC通信框架相互访问的。RpcEndpoint组件会创建与其他RpcEndpoint之间的RegisteredRpcConnection,并通过RpcGateway接口的动态代理类与其他组件进行通信。
需要注意的是,Flink把Akka作为RPC底层的通信框架,但没有使用Akka其他丰富的监督功能,并且未来有去掉Akka依赖的可能。
参考:《Flink设计与实现:核心原理与源码解析》–张利兵