Flink执行流程与源码分析

时间:2021-12-27 03:26:20

Flink执行流程与源码分析

Flink主要组件

Flink执行流程与源码分析

作业管理器(JobManager)

(1) 控制一个应用程序执行的主进程,也就是说,每个应用程序 都会被一个不同的Jobmanager所控制执行

(2) Jobmanager会先接收到要执行的应用程序,这个应用程序会包括:作业图( Job Graph)、逻辑数据流图( ogical dataflow graph)和打包了所有的类、库和其它资源的JAR包。

(3) Jobmanager会把 Jobgraph转换成一个物理层面的 数据流图,这个图被叫做 “执行图”(Executiongraph),包含了所有可以并发执行的任务。Job Manager会向资源管理器( Resourcemanager)请求执行任务必要的资源,也就是 任务管理器(Taskmanager)上的插槽slot。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 Taskmanager上。而在运行过程中Jobmanagera会负责所有需要*协调的操作,比如说检查点(checkpoints)的协调。

任务管理器(Taskmanager)

(1) Flink中的工作进程。通常在 Flink中会有多个 Taskmanageria运行, 每个 Taskmanageri都包含了一定数量的插槽( slots)。插槽的数量限制了Taskmanageri能够执行的任务数量。

(2) 启动之后, Taskmanager会向资源管理器注册它的插槽;收到资源管理器的指令后, Taskmanageri就会将一个或者多个插槽提供给Jobmanageri调用。Jobmanager就可以向插槽分配任务( tasks)来执行了。

(3) 在执行过程中, 一个 Taskmanagera可以跟其它运行同一应用程序的Taskmanager交换数据。

资源管理器(Resource Manager)

(1) 主要负责管理任务管理器( Task Manager)的 插槽(slot)Taskmanger插槽是 Flink中定义的处理资源单元。

(2) Flink 为不同的环境和资源管理工具提供了不同资源管理器,比如YARNMesos、K8s,以及 standalone部署。

(3) 当 Jobmanager申请插槽资源时, Resourcemanager会将有空闲插槽的Taskmanager?分配给Jobmanager。如果 Resourcemanagery没有足够的插槽来满足 Jobmanager的请求, 它还可以向资源提供平台发起会话,以提供启动 Taskmanager进程的容器。

分发器(Dispatcher)

(1) 可以跨作业运行,它为应用提交提供了REST接口。

(2)当一个应用被提交执行时,分发器就会启动并将应用移交给Jobmanage

(3) Dispatcher他会启动一个 WebUi,用来方便地 展示和监控作业执行的信息。

任务提交流程

Flink执行流程与源码分析

  1. 提交应用
  2. 启动并提交应用
  3. 请求slots
  4. 任务启动
  5. 注册slots
  6. 发出提供slot的指令
  7. 提供slots
  8. 提交要在slots中执行的任务
  9. 交换数据

任务提交流程(YARN)

Flink执行流程与源码分析

a. Flink任务提交后,Client向HDFS上传Flink的Jar包和配置

b. 随后向 Yarn ResourceManager提交任务ResourceManager分配 Container资源并通知对应的NodeManager启动

c. ApplicationMaster,ApplicationMaster 启动后加载Flink的Jar包和配置构建环境

d. 然后启动JobManager , 之后ApplicationMaster 向ResourceManager 申请资源启动TaskManager

e. ResourceManager 分配 Container 资源后 , 由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager

f. NodeManager 加载 Flink 的 Jar 包和配置构建环境并启动 TaskManager

g. TaskManager 启动后向 JobManager 发送心跳包,并等待 JobManager 向其分配任务。

源码分析--集群启动 JobManager 启动分析

JobManager 的内部包含非常重要的三大组件

  • WebMonitorEndpoint
  • ResourceManager
  • Dispatcher

入口,启动主类:StandaloneSessionClusterEntrypoint

  1. // 入 口 
  2. StandaloneSessionClusterEntrypoint.main() ClusterEntrypoint.runClusterEntrypoint(entrypoint); 
  3. clusterEntrypoint.startCluster();  
  4. runCluster(configuration, pluginManager); 
  5.  
  6. // 第一步:初始化各种服务 
  7.  /** 
  8.   * 初始化了 主节点对外提供服务的时候所需要的 三大核心组件启动时所需要的基础服务 
  9.   *  初始化服务,如 JobManager 的 Akka RPC 服务,HA 服务,心跳检查服务,metric service 
  10.   *  这些服务都是 Master 节点要使用到的一些服务 
  11.   *  1、commonRpcService:  基于 Akka 的 RpcService 实现。RPC 服务启动 Akka 参与者来接收从 RpcGateway 调用 RPC 
  12.   *  2、haServices:    提供对高可用性所需的所有服务的访问注册,分布式计数器和*选举 
  13.   *  3、blobServer:    负责侦听传入的请求生成线程来处理这些请求。它还负责创建要存储的目录结构 blob 或临时缓存它们 
  14.   *  4、heartbeatServices:  提供心跳所需的所有服务。这包括创建心跳接收器和心跳发送者。 
  15.   *  5、metricRegistry:   跟踪所有已注册的 Metric,它作为连接 MetricGroup 和 MetricReporter 
  16.   *  6、archivedExecutionGraphStore:   存储执行图ExecutionGraph的可序列化形式。 
  17. */ 
  18. initializeServices(configuration, pluginManager); 
  19.  
  20. // 创建 DispatcherResourceManagerComponentFactory, 初始化各种组件的 
  21. 工厂实例 
  22. // 其实内部包含了三个重要的成员变量: 
  23. // 创建 ResourceManager 的工厂实例 
  24. // 创建 Dispatcher 的工厂实例 
  25. // 创建 WebMonitorEndpoint 的工厂实例 
  26. createDispatcherResourceManagerComponentFactory(configuration); 
  27.  
  28. // 创建 集群运行需要的一些组件:Dispatcher, ResourceManager 等 
  29. // 创 建 ResourceManager 
  30. // 创 建 Dispatcher 
  31. // 创 建 WebMonitorEndpoint 
  32. clusterComponent = dispatcherResourceManagerComponentFactory.create(...) 

1. initializeServices():初始化各种服务

  1. // 初 始 化 和 启 动 AkkaRpcService, 内 部 其 实 包 装 了 一 个 ActorSystem commonRpcService = AkkaRpcServiceUtils.createRemoteRpcService(...) 
  2.  
  3. // 初始化一个负责 IO 的线程池 
  4. ioExecutor = Executors.newFixedThreadPool(...) 
  5. // 初始化 HA 服务组件,负责 HA 服务的是:ZooKeeperHaServices haServices = createHaServices(configuration, ioExecutor); 
  6.  
  7. // 初始化 BlobServer 服务端 
  8. blobServer = new BlobServer(configuration, haServices.createBlobStore()); blobServer.start(); 
  9.  
  10. // 初始化心跳服务组件, heartbeatServices = HeartbeatServices heartbeatServices = createHeartbeatServices(configuration); 
  11.  
  12. // 初始化一个用来存储 ExecutionGraph 的 Store, 实现是: 
  13. FileArchivedExecutionGraphStore 
  14. archivedExecutionGraphStore = createSerializableExecutionGraphStore(...) 

2. createDispatcherResourceManagerComponentFactory(configuration)初始化了多组件的工厂实例

  1. 1、DispatcherRunnerFactory,默认实现:DefaultDispatcherRunnerFactory  
  2.  
  3. 2、ResourceManagerFactory,默认实现:StandaloneResourceManagerFactory  
  4.  
  5. 3、RestEndpointFactory,默认实现:SessionRestEndpointFactory 
  6.  
  7. clusterComponent = dispatcherResourceManagerComponentFactory 
  8.     .create(configuration, ioExecutor, commonRpcService, haServices, 
  9.      blobServer, heartbeatServices, metricRegistry, 
  10.      archivedExecutionGraphStore, 
  11.      new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), 
  12.      this); 

3. 创建 WebMonitorEndpoint

  1. /************************************************* 
  2.   *  创建 WebMonitorEndpoint 实例, 在 Standalone 模式下:DispatcherRestEndpoint 
  3.   *  1、restEndpointFactory = SessionRestEndpointFactory 
  4.   *  2、webMonitorEndpoint = DispatcherRestEndpoint 
  5.   *  3、highAvailabilityServices.getClusterRestEndpointLeaderElectionService() = ZooKeeperLeaderElectionService 
  6.   *  当前这个 DispatcherRestEndpoint 的作用是: 
  7.   *  1、初始化的过程中,会一大堆的 Handler 
  8.   *  2、启动一个 Netty 的服务端,绑定了这些 Handler 
  9.   *  3、当 client 通过 flink 命令执行了某些操作(发起 restful 请求), 服务端由 webMonitorEndpoint 来执行处理 
  10.   *  4、举个例子: 如果通过 flink run 提交一个 Job,那么最后是由 webMonitorEndpoint 中的 JobSubmitHandler 来执行处理 
  11.   *  5、补充一个:job 由 JobSubmitHandler 执行完毕之后,转交给 Dispatcher 去调度执行 
  12.   */ 
  13.  webMonitorEndpoint = restEndpointFactory.createRestEndpoint( 
  14.   configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, 
  15.   blobServer, executor, metricFetcher, 
  16.   highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), 
  17.   fatalErrorHandler 
  18.  ); 

4. 创建 resourceManager

  1. /************************************************* 
  2.  *  创建 StandaloneResourceManager 实例对象 
  3.  *  1、resourceManager = StandaloneResourceManager 
  4.  *  2、resourceManagerFactory = StandaloneResourceManagerFactory 
  5. */ 
  6. resourceManager = resourceManagerFactory.createResourceManager( 
  7.  configuration, ResourceID.generate(), 
  8.  rpcService, highAvailabilityServices, heartbeatServices, 
  9.  fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), 
  10.  webMonitorEndpoint.getRestBaseUrl(), metricRegistry, hostname 
  11. ); 
  1. protected ResourceManager<ResourceID> createResourceManager( 
  2.   Configuration configuration, 
  3.   ResourceID resourceId, 
  4.   RpcService rpcService, 
  5.   HighAvailabilityServices highAvailabilityServices, 
  6.   HeartbeatServices heartbeatServices, 
  7.   FatalErrorHandler fatalErrorHandler, 
  8.   ClusterInformation clusterInformation, 
  9.   @Nullable String webInterfaceUrl, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, 
  11.   ResourceManagerRuntimeServices resourceManagerRuntimeServices) { 
  12.  
  13.  final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration); 
  14.  
  15.  /************************************************* 
  16.   *  注释: 得到一个 StandaloneResourceManager 实例对象 
  17.   */ 
  18.  return new StandaloneResourceManager( 
  19.   rpcService, 
  20.   resourceId, 
  21.   highAvailabilityServices, 
  22.   heartbeatServices, 
  23.   resourceManagerRuntimeServices.getSlotManager(), 
  24.   ResourceManagerPartitionTrackerImpl::new, 
  25.   resourceManagerRuntimeServices.getJobLeaderIdService(), 
  26.   clusterInformation, 
  27.   fatalErrorHandler, 
  28.   resourceManagerMetricGroup, 
  29.   standaloneClusterStartupPeriodTime, 
  30.   AkkaUtils.getTimeoutAsTime(configuration) 
  31.  ); 
  32.  
  33.  } 
  34.   
  1. /** 
  2. requestSlot():接受 solt请求 
  3. sendSlotReport(..): 将solt请求发送TaskManager 
  4. registerJobManager(...): 注册job管理者。 该job指的是 提交给flink的应用程序 
  5. registerTaskExecutor(...): 注册task执行者。 
  6. **/ 
  7. public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices, 
  8.   HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, 
  9.   JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, 
  10.   ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) { 
  11.  
  12.  /************************************************* 
  13.   *  注释: 当执行完毕这个构造方法的时候,会触发调用 onStart() 方法执行 
  14.   */ 
  15.  super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null); 
  1. protected RpcEndpoint(final RpcService rpcService, final String endpointId) { 
  2.  this.rpcService = checkNotNull(rpcService, "rpcService"); 
  3.  this.endpointId = checkNotNull(endpointId, "endpointId"); 
  4.  
  5.  /************************************************* 
  6.   *  注释:ResourceManager 或者 TaskExecutor 中的 RpcServer 实现 
  7.   *  以 ResourceManager 为例说明: 
  8.   *  启动 ResourceManager 的 RPCServer 服务 
  9.   *  这里启动的是 ResourceManager 的 Rpc 服务端。 
  10.   *  接收 TaskManager 启动好了而之后, 进行注册和心跳,来汇报 Taskmanagaer 的资源情况 
  11.   *  通过动态代理的形式构建了一个Server 
  12.   */ 
  13.  this.rpcServer = rpcService.startServer(this); 

5. 在创建resourceManager同级:启动任务接收器Starting Dispatcher

  1. /************************************************* 
  2.  
  3.  *  创建 并启动 Dispatcher 
  4.  *  1、dispatcherRunner = DispatcherRunnerLeaderElectionLifecycleManager 
  5.  *  2、dispatcherRunnerFactory = DefaultDispatcherRunnerFactory 
  6.  *  第一个参数:ZooKeeperLeaderElectionService 
  7.  *  - 
  8.  *  老版本: 这个地方是直接创建一个 Dispatcher 对象然后调用 dispatcher.start() 来启动 
  9.  *  新版本: 直接创建一个 DispatcherRunner, 内部就是要创建和启动 Dispatcher 
  10.  *  - 
  11.  *  DispatcherRunner 是对 Dispatcher 的封装。 
  12.  *  DispatcherRunner被创建的代码的内部,会创建 Dispatcher并启动 
  13.  */ 
  14. log.debug("Starting Dispatcher."); 
  15. dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner( 
  16.  highAvailabilityServices.getDispatcherLeaderElectionService(), fatalErrorHandler, 
  17.  // TODO_ZYM 注释: 注意第三个参数 
  18.  new HaServicesJobGraphStoreFactory(highAvailabilityServices), 
  19.  ioExecutor, rpcService, partialDispatcherServices 
  20. ); 

Dispatcher 启动后,将会等待任务提交,如果有任务提交,则会经过submitJob(...)函数进入后续处理。

提交(一个Flink应用的提交必须经过三个graph的转换)

Flink执行流程与源码分析

首先看下一些名词

StreamGraph

是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。可以用一个 DAG 来表示),DAG 的顶点是 StreamNode,边是 StreamEdge,边包含了由哪个 StreamNode 依赖哪个 StreamNode。

  • StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。
  • StreamEdge:表示连接两个StreamNode的边。

Flink执行流程与源码分析

DataStream 上常见的 transformation 有 map、flatmap、filter等(见DataStream Transformation了解更多)。这些transformation会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph

以map方法为例,看看源码

  1. public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) { 
  2.   // 通过java reflection抽出mapper的返回值类型 
  3.   TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), 
  4.       Utils.getCallLocationName(), true); 
  5.  
  6.   // 返回一个新的DataStream,SteramMap 为 StreamOperator 的实现类 
  7.   return transform("Map", outType, new StreamMap<>(clean(mapper))); 
  8.  
  9. public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { 
  10.   // read the output type of the input Transform to coax out errors about MissingTypeInfo 
  11.   transformation.getOutputType(); 
  12.  
  13.   // 新的transformation会连接上当前DataStream中的transformation,从而构建成一棵树 
  14.   OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( 
  15.       this.transformation, 
  16.       operatorName, 
  17.       operator, 
  18.       outTypeInfo, 
  19.       environment.getParallelism()); 
  20.  
  21.   @SuppressWarnings({ "unchecked""rawtypes" }) 
  22.   SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); 
  23.  
  24.   // 所有的transformation都会存到 env 中,调用execute时遍历该list生成StreamGraph 
  25.   getExecutionEnvironment().addOperator(resultTransform); 
  26.  
  27.   return returnStream; 

map转换将用户自定义的函数MapFunction包装到StreamMap这个Operator中,再将StreamMap包装到OneInputTransformation,最后该transformation存到env中,当调用env.execute时,遍历其中的transformation集合构造出StreamGraph

JobGraph

(1) StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点。

Flink执行流程与源码分析

  • 将并不涉及到 shuffle 的算子进行合并。
  • 对于同一个 operator chain 里面的多个算子,会在同一个 task 中执行。
  • 对于不在同一个 operator chain 里的算子,会在不同的 task 中执行。

(2) JobGraph 用来由 JobClient 提交给 JobManager,是由顶点(JobVertex)、中间结果(IntermediateDataSet)和边(JobEdge)组成的 DAG 图。

(3) JobGraph 定义作业级别的配置,而每个顶点和中间结果定义具体操作和中间数据的设置。

JobVertex

JobVertex 相当于是 JobGraph 的顶点。经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。

IntermediateDataSet

JobVertex的输出,即经过operator处理产生的数据集。

JobEdge

job graph中的一条数据传输通道。source 是IntermediateDataSet,sink 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。

(1) 首先是通过API会生成transformations,通过transformations会生成StreamGraph。

(2)将StreamGraph的某些StreamNode Chain在一起生成JobGraph,前两步转换都是在客户端完成。

(3)最后会将JobGraph转换为ExecutionGraph,相比JobGraph会增加并行度的概念,这一步是在Jobmanager里完成。

Flink执行流程与源码分析

ExecutionJobVertex

ExecutionJobVertex一一对应JobGraph中的JobVertex

ExecutionVertex

一个ExecutionJobVertex对应n个ExecutionVertex,其中n就是算子的并行度。ExecutionVertex就是并行任务的一个子任务

Execution

Execution 是对 ExecutionVertex 的一次执行,通过 ExecutionAttemptId 来唯一标识。

IntermediateResult

在 JobGraph 中用 IntermediateDataSet 表示 JobVertex 的对外输出,一个 JobGraph 可能有 n(n >=0) 个输出。在 ExecutionGraph 中,与此对应的就是 IntermediateResult。每一个 IntermediateResult 就有 numParallelProducers(并行度) 个生产者,每个生产者的在相应的 IntermediateResult 上的输出对应一个 IntermediateResultPartition。IntermediateResultPartition 表示的是 ExecutionVertex 的一个输出分区

ExecutionEdge

ExecutionEdge 表示 ExecutionVertex 的输入,通过 ExecutionEdge 将 ExecutionVertex 和 IntermediateResultPartition 连接起来,进而在不同的 ExecutionVertex 之间建立联系。

ExecutionGraph的构建

  1. 构建JobInformation
  2. 构建ExecutionGraph
  3. 将JobGraph进行拓扑排序,获取sortedTopology顶点集合
  1. // ExecutionGraphBuilder 
  2.  public static ExecutionGraph buildGraph( 
  3.   @Nullable ExecutionGraph prior
  4.   JobGraph jobGraph, 
  5.   ...) throws JobExecutionException, JobException { 
  6.   // 构建JobInformation 
  7.    
  8.   // 构建ExecutionGraph 
  9.    
  10.   // 将JobGraph进行拓扑排序,获取sortedTopology顶点集合 
  11.   List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); 
  12.    
  13.   executionGraph.attachJobGraph(sortedTopology); 
  14.  
  15.   return executionGraph; 
  16.  } 

构建ExecutionJobVertex,连接IntermediateResultPartition和ExecutionVertex

  1. //ExecutionGraph 
  2.  public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException { 
  3.   for (JobVertex jobVertex : topologiallySorted) { 
  4.    // 构建ExecutionJobVertex 
  5.    ExecutionJobVertex ejv = new ExecutionJobVertex( 
  6.      this, 
  7.      jobVertex, 
  8.      1, 
  9.      maxPriorAttemptsHistoryLength, 
  10.      rpcTimeout, 
  11.      globalModVersion, 
  12.      createTimestamp); 
  13.    // 连接IntermediateResultPartition和ExecutionVertex 
  14.    ev.connectToPredecessors(this.intermediateResults); 
  15.  } 
  16.    
  17.    
  18.   // ExecutionJobVertex 
  19.  public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException { 
  20.   List<JobEdge> inputs = jobVertex.getInputs(); 
  21.    
  22.   for (int num = 0; num < inputs.size(); num++) { 
  23.    JobEdge edge = inputs.get(num); 
  24.    IntermediateResult ires = intermediateDataSets.get(edge.getSourceId()); 
  25.    this.inputs.add(ires); 
  26.    int consumerIndex = ires.registerConsumer(); 
  27.     
  28.    for (int i = 0; i < parallelism; i++) { 
  29.     ExecutionVertex ev = taskVertices[i]; 
  30.     ev.connectSource(num, ires, edge, consumerIndex); 
  31.    } 
  32.   } 
  33.  } 

拆分计划(可执行能力)

  1. // ExecutionVertex 
  2.  public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) { 
  3.  
  4.   final DistributionPattern pattern = edge.getDistributionPattern(); 
  5.   final IntermediateResultPartition[] sourcePartitions = source.getPartitions(); 
  6.  
  7.   ExecutionEdge[] edges; 
  8.  
  9.   switch (pattern) { 
  10.    // 下游 JobVertex 的输入 partition 算法,如果是 forward 或 rescale 的话为 POINTWISE 
  11.    case POINTWISE: 
  12.     edges = connectPointwise(sourcePartitions, inputNumber); 
  13.     break; 
  14.    // 每一个并行的ExecutionVertex节点都会链接到源节点产生的所有中间结果IntermediateResultPartition 
  15.    case ALL_TO_ALL: 
  16.     edges = connectAllToAll(sourcePartitions, inputNumber); 
  17.     break; 
  18.  
  19.    default
  20.     throw new RuntimeException("Unrecognized distribution pattern."); 
  21.  
  22.   } 
  23.  
  24.   inputEdges[inputNumber] = edges; 
  25.   for (ExecutionEdge ee : edges) { 
  26.    ee.getSource().addConsumer(ee, consumerNumber); 
  27.   } 
  28.  } 
  29.  
  30.  
  31.  private ExecutionEdge[] connectPointwise(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  32.   final int numSources = sourcePartitions.length; 
  33.   final int parallelism = getTotalNumberOfParallelSubtasks(); 
  34.  
  35.   // 如果并发数等于partition数,则一对一进行连接 
  36.   if (numSources == parallelism) { 
  37.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[subTaskIndex], this, inputNumber) }; 
  38.   } 
  39.   //  如果并发数大于partition数,则一对多进行连接 
  40.   else if (numSources < parallelism) { 
  41.  
  42.    int sourcePartition; 
  43.  
  44.    if (parallelism % numSources == 0) { 
  45.     int factor = parallelism / numSources; 
  46.     sourcePartition = subTaskIndex / factor; 
  47.    } 
  48.    else { 
  49.     float factor = ((float) parallelism) / numSources; 
  50.     sourcePartition = (int) (subTaskIndex / factor); 
  51.    } 
  52.  
  53.    return new ExecutionEdge[] { new ExecutionEdge(sourcePartitions[sourcePartition], this, inputNumber) }; 
  54.   } 
  55.   // 果并发数小于partition数,则多对一进行连接 
  56.   else { 
  57.    if (numSources % parallelism == 0) { 
  58.     int factor = numSources / parallelism; 
  59.     int startIndex = subTaskIndex * factor; 
  60.  
  61.     ExecutionEdge[] edges = new ExecutionEdge[factor]; 
  62.     for (int i = 0; i < factor; i++) { 
  63.      edges[i] = new ExecutionEdge(sourcePartitions[startIndex + i], this, inputNumber); 
  64.     } 
  65.     return edges; 
  66.    } 
  67.    else { 
  68.     float factor = ((float) numSources) / parallelism; 
  69.  
  70.     int start = (int) (subTaskIndex * factor); 
  71.     int end = (subTaskIndex == getTotalNumberOfParallelSubtasks() - 1) ? 
  72.       sourcePartitions.length : 
  73.       (int) ((subTaskIndex + 1) * factor); 
  74.  
  75.     ExecutionEdge[] edges = new ExecutionEdge[end - start]; 
  76.     for (int i = 0; i < edges.length; i++) { 
  77.      edges[i] = new ExecutionEdge(sourcePartitions[start + i], this, inputNumber); 
  78.     } 
  79.  
  80.     return edges; 
  81.    } 
  82.   } 
  83.  } 
  84.  
  85.  
  86.  private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) { 
  87.   ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length]; 
  88.  
  89.   for (int i = 0; i < sourcePartitions.length; i++) { 
  90.    IntermediateResultPartition irp = sourcePartitions[i]; 
  91.    edges[i] = new ExecutionEdge(irp, this, inputNumber); 
  92.   } 
  93.  
  94.   return edges; 
  95.  } 

Flink执行流程与源码分析

返回ExecutionGraph

TaskManager

TaskManager启动

  1. public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception { 
  2.         //主要初始化一堆的service,并新建一个org.apache.flink.runtime.taskexecutor.TaskExecutor 
  3.   final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration,resourceId); 
  4.   //调用TaskExecutor的start()方法 
  5.         taskManagerRunner.start(); 

TaskExecutor :submitTask()

接着的重要函数是shumitTask()函数,该函数会通过AKKA机制,向TaskManager发出一个submitTask的消息请求,TaskManager收到消息请求后,会执行submitTask()方法。(省略了部分代码)。

  1. public CompletableFuture<Acknowledge> submitTask( 
  2.    TaskDeploymentDescriptor tdd, 
  3.    JobMasterId jobMasterId, 
  4.    Time timeout) { 
  5.  
  6.     jobInformation = tdd.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); 
  7.     taskInformation = tdd.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); 
  8.     
  9.    TaskMetricGroup taskMetricGroup = taskManagerMetricGroup.addTaskForJob(xxx); 
  10.  
  11.    InputSplitProvider inputSplitProvider = new RpcInputSplitProvider(xxx); 
  12.  
  13.    TaskManagerActions taskManagerActions = jobManagerConnection.getTaskManagerActions(); 
  14.    CheckpointResponder checkpointResponder = jobManagerConnection.getCheckpointResponder(); 
  15.  
  16.    LibraryCacheManager libraryCache = jobManagerConnection.getLibraryCacheManager(); 
  17.    ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = jobManagerConnection.getResultPartitionConsumableNotifier(); 
  18.    PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker(); 
  19.  
  20.    final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask( 
  21.     jobId, 
  22.     tdd.getAllocationId(), 
  23.     taskInformation.getJobVertexId(), 
  24.     tdd.getSubtaskIndex()); 
  25.  
  26.    final JobManagerTaskRestore taskRestore = tdd.getTaskRestore(); 
  27.  
  28.    final TaskStateManager taskStateManager = new TaskStateManagerImpl( 
  29.     jobId, 
  30.     tdd.getExecutionAttemptId(), 
  31.     localStateStore, 
  32.     taskRestore, 
  33.     checkpointResponder); 
  34.             //新建一个Task 
  35.    Task task = new Task(xxxx); 
  36.  
  37.    log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks()); 
  38.  
  39.    boolean taskAdded; 
  40.  
  41.    try { 
  42.     taskAdded = taskSlotTable.addTask(task); 
  43.    } catch (SlotNotFoundException | SlotNotActiveException e) { 
  44.     throw new TaskSubmissionException("Could not submit task.", e); 
  45.    } 
  46.  
  47.    if (taskAdded) { 
  48.        //启动任务 
  49.     task.startTaskThread(); 
  50.  
  51.     return CompletableFuture.completedFuture(Acknowledge.get()); 
  52.    }  

最后创建执行Task的线程,然后调用startTaskThread()来启动具体的执行线程,Task线程内部的run()方法承载了被执行的核心逻辑。

Task是执行在TaskExecutor进程里的一个线程,下面来看看其run方法

(1) 检测当前状态,正常情况为CREATED,如果是FAILED或CANCELING直接返回,其余状态将抛异常。

(2) 读取DistributedCache文件。

(3) 启动ResultPartitionWriter和InputGate。

(4) 向taskEventDispatcher注册partitionWriter。

(5) 根据nameOfInvokableClass加载对应的类并实例化。

(6) 将状态置为RUNNING并执行invoke方法。

  1. public void run() { 
  2.         while (true) { 
  3.             ExecutionState current = this.executionState; 
  4.             invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass); 
  5.             network.registerTask(this); 
  6.             Environment env = new RuntimeEnvironment(. . . . ); 
  7.             invokable.setEnvironment(env); 
  8.             //  actual task core work 
  9.             if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) { 
  10.             } 
  11.             // notify everyone that we switched to running 
  12.             notifyObservers(ExecutionState.RUNNING, null); 
  13.             executingThread.setContextClassLoader(userCodeClassLoader); 
  14.             // run the invokable 
  15.             invokable.invoke(); 
  16.  
  17.             if (transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) { 
  18.                 notifyObservers(ExecutionState.FINISHED, null); 
  19.             } 
  20.             Finally{ 
  21.                 // free the network resources 
  22.                 network.unregisterTask(this); 
  23.                 // free memory resources 
  24.                 if (invokable != null) { 
  25.                     memoryManager.releaseAll(invokable); 
  26.                 } 
  27.                 libraryCache.unregisterTask(jobId, executionId); 
  28.                 removeCachedFiles(distributedCacheEntries, fileCache); 

总结

整体的流程与架构可能三两张图或者三言两语就可以勾勒出画面,但是背后源码的实现是艰辛的。源码的复杂度和当初设计框架的抓狂感,我们只有想象。现在我们只是站在巨人的肩膀上去学习。

本篇的主题是"Flink架构与执行流程",做下小结,Flink on Yarn的提交执行流程:

1 Flink任务提交后,Client向HDFS上传Flink的Jar包和配置。

2 向Yarn ResourceManager提交任务。

3 ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster。

4 ApplicationMaster启动后加载Flink的Jar包和配置构建环境。

5 启动JobManager之后ApplicationMaster向ResourceManager申请资源启动TaskManager。

6 ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager。

7 NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager。

8 TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。

原文链接:https://mp.weixin.qq.com/s/iA_nMURBPMeqkSJ-kQUEjw