MapReduce源码分析之JobSubmitter(一)

时间:2023-03-09 21:13:15
MapReduce源码分析之JobSubmitter(一)

JobSubmitter,顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外,对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的所有业务逻辑。本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter。

首先,我们先看下JobSubmitter的类成员变量,如下:

  1. // 文件系统FileSystem实例
  2. private FileSystem jtFs;
  3. // 客户端通信协议ClientProtocol实例
  4. private ClientProtocol submitClient;
  5. // 提交作业的主机名
  6. private String submitHostName;
  7. // 提交作业的主机地址
  8. private String submitHostAddress;

它一共有四个类成员变量,分别为:

1、文件系统FileSystem实例jtFs:用于操作作业运行需要的各种文件等;

2、客户端通信协议ClientProtocol实例submitClient:用于与集群交互,完成作业提交、作业状态查询等;

3、提交作业的主机名submitHostName;

4、提交作业的主机地址submitHostAddress。

其中,客户端通信协议ClientProtocol实例submitClient是通过Cluster的客户端通信协议ClientProtocol实例client来赋值的,我们在《MapReduce源码分析之新API作业提交(二):连接集群》一文中曾经提到过,它根据MapReduce中参数mapreduce.framework.name的配置为yarn或local,有Yarn模式的YARNRunner和Local模式的LocalJobRunner两种情况。
        接下来,我们再看下JobSubmitter的构造函数,如下:

  1. JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)
  2. throws IOException {
  3. // 根据入参赋值成员变量submitClient、jtFs
  4. this.submitClient = submitClient;
  5. this.jtFs = submitFs;
  6. }

很简单,根据入参赋值成员变量submitClient、jtFs而已。

关键的来了,我们看下JobSubmitter唯一的对外核心功能方法submitJobInternal(),它被用于提交作业至集群,代码如下:

  1. /**
  2. * Internal method for submitting jobs to the system.
  3. *
  4. * <p>The job submission process involves:
  5. * <ol>
  6. *   <li>
  7. *   Checking the input and output specifications of the job.
  8. *   </li>
  9. *   <li>
  10. *   Computing the {@link InputSplit}s for the job.
  11. *   </li>
  12. *   <li>
  13. *   Setup the requisite accounting information for the
  14. *   {@link DistributedCache} of the job, if necessary.
  15. *   </li>
  16. *   <li>
  17. *   Copying the job's jar and configuration to the map-reduce system
  18. *   directory on the distributed file-system.
  19. *   </li>
  20. *   <li>
  21. *   Submitting the job to the <code>JobTracker</code> and optionally
  22. *   monitoring it's status.
  23. *   </li>
  24. * </ol></p>
  25. * @param job the configuration to submit
  26. * @param cluster the handle to the Cluster
  27. * @throws ClassNotFoundException
  28. * @throws InterruptedException
  29. * @throws IOException
  30. */
  31. JobStatus submitJobInternal(Job job, Cluster cluster)
  32. throws ClassNotFoundException, InterruptedException, IOException {
  33. //validate the jobs output specs
  34. // 调用checkSpecs()方法,校验作业输出路径是否配置,且是否已存在,
  35. // 正确的情况应该是已配置且未存在,输出路径配置参数为mapreduce.output.fileoutputformat.outputdir,
  36. // 之前WordCount作业的输出路径配置为hdfs://nameservice1/output/output
  37. checkSpecs(job);
  38. // 从作业job中获取配置信息conf
  39. Configuration conf = job.getConfiguration();
  40. // 调用addMRFrameworkToDistributedCache()方法添加应用框架路径到分布式缓存中
  41. addMRFrameworkToDistributedCache(conf);
  42. // 通过JobSubmissionFiles的getStagingDir()静态方法获取作业执行时阶段区域路径jobStagingArea
  43. // 取参数yarn.app.mapreduce.am.staging-dir,参数未配置默认为/tmp/hadoop-yarn/staging
  44. // 然后后面是/提交作业用户名/.staging
  45. // 通过之前的WordCount任务的执行,我们查看历史记录,得知参数yarn.app.mapreduce.am.staging-dir配置的为/user,
  46. // 而提交作业用户名为hdfs,所以完整的路径应该为/user/hdfs/.staging
  47. Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
  48. //configure the command line options correctly on the submitting dfs
  49. // 获取当前本机地址
  50. InetAddress ip = InetAddress.getLocalHost();
  51. // 确定提交作业的主机地址、主机名,并设置入配置信息conf,对应参数分别为
  52. // mapreduce.job.submithostname
  53. // mapreduce.job.submithostaddress
  54. if (ip != null) {
  55. submitHostAddress = ip.getHostAddress();
  56. submitHostName = ip.getHostName();
  57. conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
  58. conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
  59. }
  60. // 生成作业ID,即JobID实例jobId
  61. JobID jobId = submitClient.getNewJobID();
  62. // 将jobId设置入job
  63. job.setJobID(jobId);
  64. // 构造提交作业路径Path实例submitJobDir,jobStagingArea后接/jobId,比如/job_1459913635503_0005
  65. // 之前WordCount作业的完整路径为/user/hdfs/.staging/job_1459913635503_0005
  66. Path submitJobDir = new Path(jobStagingArea, jobId.toString());
  67. JobStatus status = null;
  68. // 设置作业一些参数:
  69. try {
  70. // 设置mapreduce.job.user.name为当前用户,之前的WordCount示例配置的为hdfs用户
  71. conf.set(MRJobConfig.USER_NAME,
  72. UserGroupInformation.getCurrentUser().getShortUserName());
  73. // 设置hadoop.http.filter.initializers为AmFilterInitializer
  74. conf.set("hadoop.http.filter.initializers",
  75. "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
  76. // 设置mapreduce.job.dir为submitJobDir,比如/user/hdfs/.staging/job_1459913635503_0005
  77. conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
  78. LOG.debug("Configuring job " + jobId + " with " + submitJobDir
  79. + " as the submit dir");
  80. // get delegation token for the dir
  81. // 获取路径的授权令牌:调用TokenCache的obtainTokensForNamenodes()静态方法
  82. TokenCache.obtainTokensForNamenodes(job.getCredentials(),
  83. new Path[] { submitJobDir }, conf);
  84. // 获取密钥和令牌,并将它们存储到令牌缓存TokenCache中
  85. populateTokenCache(conf, job.getCredentials());
  86. // generate a secret to authenticate shuffle transfers
  87. if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
  88. KeyGenerator keyGen;
  89. try {
  90. int keyLen = CryptoUtils.isShuffleEncrypted(conf)
  91. ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
  92. MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
  93. : SHUFFLE_KEY_LENGTH;
  94. keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
  95. keyGen.init(keyLen);
  96. } catch (NoSuchAlgorithmException e) {
  97. throw new IOException("Error generating shuffle secret key", e);
  98. }
  99. SecretKey shuffleKey = keyGen.generateKey();
  100. TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
  101. job.getCredentials());
  102. }
  103. // 复制并且配置相关文件
  104. copyAndConfigureFiles(job, submitJobDir);
  105. // 获取配置文件路径:job.xml
  106. Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
  107. // Create the splits for the job
  108. LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
  109. // 调用writeSplits()方法,写分片数据文件job.split和分片元数据文件job.splitmetainfo,
  110. // 并获得计算得到的map任务数目maps
  111. int maps = writeSplits(job, submitJobDir);
  112. // 配置信息中设置map任务数目mapreduce.job.maps为上面得到的maps
  113. conf.setInt(MRJobConfig.NUM_MAPS, maps);
  114. LOG.info("number of splits:" + maps);
  115. // write "queue admins of the queue to which job is being submitted"
  116. // to job file.
  117. // 获取作业队列名queue,取参数mapreduce.job.queuename,参数未配置默认为default,
  118. // 之前的WordCount任务示例中,作业队列名queue就为default
  119. String queue = conf.get(MRJobConfig.QUEUE_NAME,
  120. JobConf.DEFAULT_QUEUE_NAME);
  121. // 获取队列的访问权限控制列表AccessControlList实例acl,通过客户端通信协议ClientProtocol实例submitClient的getQueueAdmins()方法,传入队列名queue,
  122. // 实际上之前的WordCount任务示例中,这里获取的是*
  123. AccessControlList acl = submitClient.getQueueAdmins(queue);
  124. // 配置信息中设置队列参数mapred.queue.default.acl-administer-jobs
  125. // 之前的WordCount任务示例中,该参数被设置成为*
  126. conf.set(toFullPropertyName(queue,
  127. QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
  128. // removing jobtoken referrals before copying the jobconf to HDFS
  129. // as the tasks don't need this setting, actually they may break
  130. // because of it if present as the referral will point to a
  131. // different job.
  132. // 清空缓存的令牌
  133. TokenCache.cleanUpTokenReferral(conf);
  134. // 根据参数确定是否需要追踪令牌ID
  135. // 取参数mapreduce.job.token.tracking.ids.enabled,参数未配置默认为false
  136. if (conf.getBoolean(
  137. MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
  138. MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
  139. // 通过job获取令牌ID,并存储到trackingIds列表中
  140. // Add HDFS tracking ids
  141. ArrayList<String> trackingIds = new ArrayList<String>();
  142. for (Token<? extends TokenIdentifier> t :
  143. job.getCredentials().getAllTokens()) {
  144. trackingIds.add(t.decodeIdentifier().getTrackingId());
  145. }
  146. // 将trackingIds列表中的内容设置到参数mapreduce.job.token.tracking.ids中
  147. conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
  148. trackingIds.toArray(new String[trackingIds.size()]));
  149. }
  150. // Set reservation info if it exists
  151. // 如有必要,设置存在的预订信息
  152. // 参数为mapreduce.job.reservation.id
  153. ReservationId reservationId = job.getReservationId();
  154. if (reservationId != null) {
  155. conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
  156. }
  157. // Write job file to submit dir
  158. // 调用writeConf()方法,写入作业配置信息至文件job.xml
  159. writeConf(conf, submitJobFile);
  160. //
  161. // Now, actually submit the job (using the submit name)
  162. // 调用printTokens()方法打印令牌信息到Log文件
  163. printTokens(jobId, job.getCredentials());
  164. // 通过客户端通信协议ClientProtocol实例submitClient的submitJob()方法提交作业,
  165. // 并获取作业状态JobStatus实例status
  166. // 由集群连接一文的分析我们可以知道,这个submitClient实际上是YARNRunner或LocalJobRunner对象,
  167. // 最终调用的是二者的submitJob()方法,我们留待以后分析
  168. status = submitClient.submitJob(
  169. jobId, submitJobDir.toString(), job.getCredentials());
  170. // 如果作业状态JobStatus实例status不为null,直接返回,否则抛出无法加载作业的IO异常
  171. if (status != null) {
  172. return status;
  173. } else {
  174. throw new IOException("Could not launch job");
  175. }
  176. } finally {
  177. // 最终,抛出无法加载作业的IO异常前,调用文件系统FileSystem实例jtFs的delete()方法,
  178. // 删除作业提交的相关目录或文件submitJobDir
  179. if (status == null) {
  180. LOG.info("Cleaning up the staging area " + submitJobDir);
  181. if (jtFs != null && submitJobDir != null)
  182. jtFs.delete(submitJobDir, true);
  183. }
  184. }
  185. }

submitJobInternal()方法篇幅比较长,逻辑也很复杂,本文先介绍下它的大体逻辑,后续分文会介绍各个环节的详细内容,且下面涉及到的之前WordCount作业示例在《Hadoop2.6.0版本MapReudce示例之WordCount(一)》及其姊妹篇中,敬请注意!submitJobInternal()方法大体逻辑如下:

1、调用checkSpecs()方法,校验作业输出路径是否配置,且是否已存在:

正确的情况应该是已配置且未存在,输出路径配置参数为mapreduce.output.fileoutputformat.outputdir,之前WordCount作业的输出路径配置为hdfs://nameservice1/output/output;

2、从作业job中获取配置信息conf;

3、调用addMRFrameworkToDistributedCache()方法添加应用框架路径到分布式缓存中;

4、通过JobSubmissionFiles的getStagingDir()静态方法获取作业执行时阶段区域路径jobStagingArea:

取参数yarn.app.mapreduce.am.staging-dir,参数未配置默认为/tmp/Hadoop-yarn/staging,然后后面是/提交作业用户名/.staging,通过之前的WordCount任务的执行,我们查看历史记录,得知参数yarn.app.mapreduce.am.staging-dir配置的为/user,而提交作业用户名为hdfs,所以完整的路径应该为/user/hdfs/.staging;

5、获取当前本机地址ip;

6、确定提交作业的主机地址、主机名,并设置入配置信息conf,对应参数分别为mapreduce.job.submithostname、mapreduce.job.submithostaddress;

7、生成作业ID,即JobID实例jobId:

通过客户端通信协议ClientProtocol实例submitClient的getNewJobID()方法生成作业ID,即JobID实例jobId;

8、 将jobId设置入job;

9、构造提交作业路径Path实例submitJobDir:

jobStagingArea后接/jobId,比如/job_1459913635503_0005,之前WordCount作业的完整路径为/user/hdfs/.staging/job_1459913635503_0005;

10、设置作业一些参数:

10.1、设置mapreduce.job.user.name为当前用户,之前的WordCount示例配置的为hdfs用户;

10.2、设置hadoop.http.filter.initializers为AmFilterInitializer;

10.3、设置mapreduce.job.dir为submitJobDir,比如/user/hdfs/.staging/job_1459913635503_0005;

11、获取路径的授权令牌:调用TokenCache的obtainTokensForNamenodes()静态方法;

12、通过populateTokenCache()方法获取密钥和令牌,并将它们存储到令牌缓存TokenCache中;

14、复制并且配置相关文件:通过copyAndConfigureFiles()方法实现;

15、获取配置文件路径:job.xml;

16、调用writeSplits()方法,写分片数据文件job.split和分片元数据文件job.splitmetainfo,并获得计算得到的map任务数目maps;

17、配置信息中设置map任务数目mapreduce.job.maps为上面得到的maps;

18、获取作业队列名queue,取参数mapreduce.job.queuename,参数未配置默认为default,之前的WordCount任务示例中,作业队列名queue就为default;

19、获取队列的访问权限控制列表AccessControlList实例acl:

通过客户端通信协议ClientProtocol实例submitClient的getQueueAdmins()方法,传入队列名queue,实际上之前的WordCount任务示例中,这里获取的是*;

20、配置信息中设置队列参数mapred.queue.default.acl-administer-jobs,之前的WordCount任务示例中,该参数被设置成为*;

21、清空缓存的令牌:通过TokenCache的cleanUpTokenReferral()方法实现;

22、根据参数确定是否需要追踪令牌ID,如果需要的话:

取参数mapreduce.job.token.tracking.ids.enabled,参数未配置默认为false,通过job获取令牌ID,并存储到trackingIds列表中,将trackingIds列表中的内容设置到参数mapreduce.job.token.tracking.ids中;

23、如有必要,设置存在的预订信息:参数为mapreduce.job.reservation.id;

24、调用writeConf()方法,写入作业配置信息至文件job.xml;

25、调用printTokens()方法打印令牌信息到Log文件;

26、通过客户端通信协议ClientProtocol实例submitClient的submitJob()方法提交作业,并获取作业状态JobStatus实例status:

由集群连接一文的分析我们可以知道,这个submitClient实际上是YARNRunner或LocalJobRunner对象,最终调用的是二者的submitJob()方法,我们留待以后分析;

27、如果作业状态JobStatus实例status不为null,直接返回,否则抛出无法加载作业的IO异常:

最终,抛出无法加载作业的IO异常前,调用文件系统FileSystem实例jtFs的delete()方法,删除作业提交的相关目录或文件submitJobDir。

整体流程如上,对于关键步骤的主要细节,限于篇幅,敬请关注《MapReduce源码分析之JobSubmitter(二)》!