Spark之二:Spark任务运行的详细流程

时间:2024-03-16 22:37:44

Spark任务执行详细流程图

Spark之二:Spark任务运行的详细流程

适用于Spark Standalone集群模式下的client模式

  1. 执行启动命令:shell命令调用Spark-submit脚本
  2. 通过exec调用spark-submit类的的main方法
  3. spark-submit类通过反射来执行我们自己写的WordCount的main()方法
  4. New SparkContext(conf)是任务运行所需对象的初始化、实例化工作(整个程序想要运行的先决者,会产生DAGScheduler和TaskScheduler。)
  5. 创建TaskScheduler调度器(特质),TaskSchedulerImpl(实现类)
  6. 创建DAG的调度DAGScheduler
  7. 创建SchedulerBackend和启动:SchedulerBackend是一个调度器,在standalone的模式下实例化为StandaloneSechedulerBackend;创建StandaloneSechedulerBackend优先调用父类CoarseGrainedSchedulerBackend的start方法;创建DriverEndpoint再去创建ClientEndpoint
  8. ClientEndpoint向Master申请运行Application任务(附带有启动executor的命令)
  9. Master进行资源分配,发送指令启动executor
  10. Worker根据指定资源(core,memory)启动executor;每个Worker里面都有相应的executor,Worker启动相应executor;executor的分配是粗粒度分配(先占用资源:一直占用,释放才能使用)
  11. 各个Executor就要像DriverEndpoint注册,等待分配任务
  12. 当SparkContext创建完成之后,就开始解析代码;Drvier会记录rdd之间的依赖关系,rdd读取的数据,rdd执行的业务逻辑(传递的函数)等等相关的描述信息,开始构建DAG
  13. saveAsTextFile(output)当解析到action算子,开始准备提交任务,此时DAG构建完成
  14. 触发action之后,调用runJob()方法,最终会调用DAGScheduler的handlerJobSubmit()方法来提交job
  15. 在handlerJobSubmit()方法中,开始准备切分Stage,怎么切分??
    根据finalRDD创建finalStage,然后根据RDD之间的依赖关系,从后往前遍历
    一旦发现依赖关系是窄依赖,就把当前rdd的父rdd放到当前的stage中
    一旦发现是宽依赖,就终止当前的stage,创建新的stage继续遍历,直到某一个rdd没有父rdd了,就结束了stage切分;
    DAG最后一个rrd我们把它称为finalRDD,根据finalRDD创建一个finalStage:resultSaget=null
  16. 怎么提交Stage???
    SubmitStage(finalStage)从finalStage中开始提交,会做一个深度优先遍历
    一旦发现要提交的stage有父stage没有执行,就先提交父stage执行
    Stage0先执行,执行完成之后,在执行stage1
  17. 如何创建Task()????
    SubmitMissingTasks(stage)根据这个stage的最后一个rdd分区数量—>task数量(stage0 3)
    根据stage的类型,创建同类型的Task
    **)把一个stage中的所有task,用TaskSet包装,然后交给TaskScheduler去调度(taskScheduler.submitTask(new TaskSet))
  18. TaskScheduler要做哪些工作???
    遍历拿到的TaskSet,得到每一个Task;把每一个task进行序列化,把task发送个executor执行
    (executorData.executorEndpoint.send(launchTask(new SerializableBuffer(serializableTask))))
  19. executor执行task的流程
    接收到序列化的task之后,反序列化
    用taskRunner(tr)封装反序列化的task(tr),taskRunner实现了Runnable接口的类
    把tr交个executor拥有的线程中,等待被调度(ThreadPool)
    TaskRunner的run方法被执行,最终会调用Task的runTask方法
    会根据task类型,执行相应的runTask方法
    shuffleMapTask – runTask
    ResultTask --runTask
    当runTask方法执行的时候,才是真正的业务逻辑开始执行的时候。
  20. 读取hdfs上对应的blk块的数据,然后执行业务逻辑
  21. 当shuffle、write完成之后(Task执行写入到磁盘),stage工作就完成了
  22. 当stage0完成之后,开始执行stage1;执行stage1的顺序和stage0书序相同(17-19)
  23. 第二个stage的task的功能:shuffle、read读取对应分区的数据,执行业务逻辑
  24. 写入数据到hdfs中