Hadoop源码解析之YARN客户端作业提交流程

时间:2020-12-15 08:32:15

1. 简介

hadoop在1.x中是向JobTracker提交,而在2.x中换成了ResourceManager,客户端的代理对象也有所变动,换成了YarnRunner,但大致流程和1类似,主要的流程集中在JobSubmitter.submitJobInternal中,包括检测输出目录合法性,设置作业提交信息(主机和用户),获得JobID,向HDFS中拷贝作业所需文件(Job.jar Job.xml split文件等),最后执行作业提交。

2.源码解析

waitForCompletion函数用于提交作业,循环监控作业状态

public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException,   ClassNotFoundException {  
if (state == JobState.DEFINE) {
submit();//提交
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis = Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
主要分析submit函数,来看作业是如何提交的,主要分为两个阶段

1、连接master 

2、作业提交

public void submit() throws IOException, InterruptedException, ClassNotFoundException {  
ensureState(JobState.DEFINE);
setUseNewAPI();
//连接RM
connect();
final JobSubmitter submitter = getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
//提交作业
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
连接master时会建立Cluster实例,下面是Cluster构造函数,其中重点初始化部分

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)   throws IOException {  
this.conf = conf;
this.ugi = UserGroupInformation.getCurrentUser();
initialize(jobTrackAddr, conf);
}

创建客户端代理阶段用到了java.util.ServiceLoader,目前2.5.2版本包含两个LocalClientProtocolProvider(本地作业) YarnClientProtocolProvider(Yarn作业),此处会根据mapreduce.framework.name的配置创建相应的客户端

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)  throws IOException {  
synchronized (frameworkLoader) {
for (ClientProtocolProvider provider : frameworkLoader) {
LOG.debug("Trying ClientProtocolProvider : " + provider.getClass().getName());
ClientProtocol clientProtocol = null;
try {
if (jobTrackAddr == null) {
//创建YARNRunner对象
clientProtocol = provider.create(conf);
} else {
clientProtocol = provider.create(jobTrackAddr, conf);
}
//初始化Cluster内部成员变量
if (clientProtocol != null) {
clientProtocolProvider = provider;
client = clientProtocol;
LOG.debug("Picked " + provider.getClass().getName() + " as the ClientProtocolProvider");
break;
}
else {
LOG.debug("Cannot pick " + provider.getClass().getName() + " as the ClientProtocolProvider - returned null protocol");
}
}
catch (Exception e) {
LOG.info("Failed to use " + provider.getClass().getName()
+ " due to error: " + e.getMessage());
}
}
}

if (null == clientProtocolProvider || null == client) {
throw new IOException(
"Cannot initialize Cluster. Please check your configuration for "
+ MRConfig.FRAMEWORK_NAME
+ " and the correspond server addresses.");
}
}

Yarn模式下,如果参数mapreduce.framework.name配置的为yarn,构造一个YARNRunner实例并返回,否则返回null .
Local模式也是需要看参数mapreduce.framework.name的配置是否为local,是的话,返回LocalJobRunner实例,并设置map任务数量为1,否则返回null,值得一提的是,这里参数mapreduce.framework.name未配置的话,默认为local,也就是说,MapReduce需要看参数mapreduce.framework.name确定连接模式,但默认是Local模式的。
        到了这里,我们就能够知道一个很重要的信息,Cluster中客户端通信协议ClientProtocol实例,要么是Yarn模式下的YARNRunner,要么就是Local模式下的LocalJobRunner,记住这点,对透彻了解MapReduce作业提交的整体流程非常重要。
        好了,我们继续以Yarn模式来分析MapReduce集群连接,看下YARNRunner的实现,先看下它的成员变量,如下:


// 记录工厂RecordFactory实例  
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

// ResourceManager代理ResourceMgrDelegate实例
private ResourceMgrDelegate resMgrDelegate;

// 客户端缓存ClientCache实例
private ClientCache clientCache;

// 配置信息Configuration实例
private Configuration conf;

// 文件上下文FileContext实例
private final FileContext defaultFileContext;

其中,最重要的一个变量就是ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息,其内部有一个YarnClient实例YarnClient,负责与Yarn进行通信,还有ApplicationId、ApplicationSubmissionContext等与特定应用程序相关的成员变量。
        另外一个比较重要的变量就是客户端缓存ClientCache实例clientCache,
        接下来,我们看下YARNRunner的create函数,如下:


public ClientProtocol create(Configuration conf) throws IOException {  
if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
return new YARNRunner(conf);
}
return null;
}
创建客户端代理的流程如下:
Cluster->ClientProtocol(YarnRunner)->ResourceMgrDelegate->client(YarnClientImpl)->rmClient(ApplicationClientProtocol)
在YarnClientImpl的serviceStart阶段会创建RPC代理,注意其中的协议。

protected void serviceStart() throws Exception {  
try {
rmClient = ClientRMProxy.createRMProxy(getConfig(), ApplicationClientProtocol.class);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
YarnRunner的构造函数如下:

public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,  
ClientCache clientCache) {
this.conf = conf;
try {
this.resMgrDelegate = resMgrDelegate;
this.clientCache = clientCache;
this.defaultFileContext = FileContext.getFileContext(this.conf);
} catch (UnsupportedFileSystemException ufe) {
throw new RuntimeException("Error in instantiating YarnClient", ufe);
}
}

下面看最核心的提交部分JobSubmitter.submitJobInternal

JobStatus submitJobInternal(Job job, Cluster cluster)   
throws ClassNotFoundException, InterruptedException, IOException {
//检测输出目录合法性,是否已存在,或未设置
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
//获得登录区,用以存放作业执行过程中用到的文件,默认位置/tmp/hadoop-yarn/staging/username/.staging
//可通过yarn.app.mapreduce.am.staging-dir修改
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//主机名和地址设置
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
//获取新的JobID,此处需要RPC调用
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
//获取提交目录:/tmp/hadoop-yarn/staging/username/.staging/job_***
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);

populateTokenCache(conf, job.getCredentials());

// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
//向集群中拷贝所需文件,下面会单独分析(1)
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

// 写分片文件job.split job.splitmetainfo,
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir);
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);

// write "queue admins of the queue to which job is being submitted"
// to job file.
//设置队列名
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);

if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Write job file to submit dir
//写入job.xml
writeConf(conf, submitJobFile);

// Now, actually submit the job (using the submit name)
//这里才开始真正提交,见下面分析(2)
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}

(1)文件拷贝过程如下,默认副本数为10

private void copyAndConfigureFiles(Job job, Path jobSubmitDir)   
throws IOException {
Configuration conf = job.getConfiguration();
short replication = (short)conf.getInt(Job.SUBMIT_REPLICATION, 10);
//开始拷贝
copyAndConfigureFiles(job, jobSubmitDir, replication);
// Set the working directory
if (job.getWorkingDirectory() == null) {
job.setWorkingDirectory(jtFs.getWorkingDirectory());
}
}

下面是具体文件拷贝过程,注释里写的也比较清楚了

// configures -files, -libjars and -archives.  
private void copyAndConfigureFiles(Job job, Path submitJobDir,
short replication) throws IOException {
Configuration conf = job.getConfiguration();
if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
LOG.warn("Hadoop command-line option parsing not performed. " +
"Implement the Tool interface and execute your application " +
"with ToolRunner to remedy this.");
}
// get all the command line arguments passed in by the user conf
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
String jobJar = job.getJar();
//
// Figure out what fs the JobTracker is using. Copy the
// job to it, under a temporary name. This allows DFS to work,
// and under the local fs also provides UNIX-like object loading
// semantics. (that is, if the job file is deleted right after
// submission, we can still run the submission to completion)
//

// Create a number of filenames in the JobTracker's fs namespace
LOG.debug("default FileSystem: " + jtFs.getUri());
if (jtFs.exists(submitJobDir)) {
throw new IOException("Not submitting job. Job directory " + submitJobDir
+" already exists!! This is unexpected.Please check what's there in" +
" that directory");
}
submitJobDir = jtFs.makeQualified(submitJobDir);
submitJobDir = new Path(submitJobDir.toUri().getPath());
FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
//创建工作目录
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir);
Path archivesDir = JobSubmissionFiles.getJobDistCacheArchives(submitJobDir);
Path libjarsDir = JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir);
// add all the command line files/ jars and archive
// first copy them to jobtrackers filesystem
//建立上述所需目录
if (files != null) {
FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms);
String[] fileArr = files.split(",");
for (String tmpFile: fileArr) {
URI tmpURI = null;
try {
tmpURI = new URI(tmpFile);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(filesDir, tmp, conf, replication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheFile(pathURI, conf);
} catch(URISyntaxException ue) {
//should not throw a uri exception
throw new IOException("Failed to create uri for " + tmpFile, ue);
}
}
}

if (libjars != null) {
FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms);
String[] libjarsArr = libjars.split(",");
for (String tmpjars: libjarsArr) {
Path tmp = new Path(tmpjars);
Path newPath = copyRemoteFiles(libjarsDir, tmp, conf, replication);
DistributedCache.addFileToClassPath(
new Path(newPath.toUri().getPath()), conf);
}
}

if (archives != null) {
FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms);
String[] archivesArr = archives.split(",");
for (String tmpArchives: archivesArr) {
URI tmpURI;
try {
tmpURI = new URI(tmpArchives);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
Path tmp = new Path(tmpURI);
Path newPath = copyRemoteFiles(archivesDir, tmp, conf,
replication);
try {
URI pathURI = getPathURI(newPath, tmpURI.getFragment());
DistributedCache.addCacheArchive(pathURI, conf);
} catch(URISyntaxException ue) {
//should not throw an uri excpetion
throw new IOException("Failed to create uri for " + tmpArchives, ue);
}
}
}


if (jobJar != null) { // copy jar to JobTracker's fs
// use jar name if job is not named.
if ("".equals(job.getJobName())){
job.setJobName(new Path(jobJar).getName());
}
Path jobJarPath = new Path(jobJar);
URI jobJarURI = jobJarPath.toUri();
// If the job jar is already in fs, we don't need to copy it from local fs
if (jobJarURI.getScheme() == null || jobJarURI.getAuthority() == null
|| !(jobJarURI.getScheme().equals(jtFs.getUri().getScheme())
&& jobJarURI.getAuthority().equals(
jtFs.getUri().getAuthority()))) {
//拷贝wordcount.jar,注意拷贝过去后会重命名为job.jar,副本数为10
copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir),
replication);
job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString());
}
} else {
LOG.warn("No job jar file set. User classes may not be found. "+
"See Job or Job#setJar(String).");
}

// set the timestamps of the archives and files
// set the public/private visibility of the archives and files
ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(conf);
// get DelegationToken for each cached file
ClientDistributedCacheManager.getDelegationTokens(conf, job
.getCredentials());
}
(2)真正的作业提交部分

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)  
throws IOException, InterruptedException {

addHistoryToken(ts);

// Construct necessary information to start the MR AM
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);


// Submit to ResourceManager
try {
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);


ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}

真正提交的代码是ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);

进入ResourceMgrDelegate类的submitApplication函数:

public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
return client.submitApplication(appContext);
}
这里的client变量是YarnClientImpl类的一个实例,进入YarnClientImpl类:

public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if (applicationId == null) {
throw new ApplicationIdNotProvidedException(
"ApplicationId is not provided in ApplicationSubmissionContext");
}
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);

// Automatically add the timeline DT into the CLC
// Only when the security and the timeline service are both enabled
if (isSecurityEnabled() && timelineServiceEnabled) {
addTimelineDelegationToken(appContext.getAMContainerSpec());
}

//TODO: YARN-1763:Handle RM failovers during the submitApplication call.
rmClient.submitApplication(request);

int pollCount = 0;
long startTime = System.currentTimeMillis();

while (true) {
try {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId);
break;
}

long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be submitted successfully");
}

// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
}
try {
Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for application "
+ applicationId
+ " to be successfully submitted.");
}
} catch (ApplicationNotFoundException ex) {
// FailOver or RM restart happens before RMStateStore saves
// ApplicationState
LOG.info("Re-submit application " + applicationId + "with the " +
"same ApplicationSubmissionContext");
rmClient.submitApplication(request);
}
}

return applicationId;
}
提交的代码为rmClient.submitApplication(request);

rmClient变量初始化的地方:

protected void serviceStart() throws Exception {
try {
rmClient = ClientRMProxy.createRMProxy(getConfig(),
ApplicationClientProtocol.class);
if (historyServiceEnabled) {
historyClient.start();
}
if (timelineServiceEnabled) {
timelineClient.start();
}
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
可以看到rmClient是一个ClientRMProxy创建的代理,对应的实现是ApplicationClientProtocolPBClientImpl类:
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException,
IOException {
SubmitApplicationRequestProto requestProto =
((SubmitApplicationRequestPBImpl) request).getProto();
try {
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
这里的proxy.submitApplication(null,requestProto)就是通过RPC向RM提交请求。


 总结
        MapReduce作业提交时连接集群是通过Job的connect()方法实现的,它实际上是构造集群Cluster实例cluster。
        Cluster为连接MapReduce集群的一种工具,提供了一种获取MapReduce集群信息的方法。
        在Cluster内部,有一个与集群进行通信的客户端通信协议ClientProtocol实例client,它由ClientProtocolProvider的静态create()方法构造,
        而Hadoop2.5.2中提供了两种模式的ClientProtocol,分别为Yarn模式的YARNRunner和Local模式的LocalJobRunner,
        Cluster实际上是由它们负责与集群进行通信的,而Yarn模式下,ClientProtocol实例YARNRunner对象内部有一个ResourceManager代理ResourceMgrDelegate实例resMgrDelegate,Yarn模式下整个MapReduce客户端就是由它负责与Yarn集群进行通信,完成诸如作业提交、作业状态查询等过程,通过它获取集群的信息。


参考:
http://blog.csdn.net/lihm0_1/article/details/22186833
http://blog.csdn.net/lipeng_bigdata/article/details/51217534