WordCount作业提交到FileInputFormat类中split切分算法和host选择算法过程源码分析

时间:2023-11-10 22:46:02

参考 FileInputFormat类中split切分算法和host选择算法介绍  以及 Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)  以及 Hadoop中FileInputFormat计算InputSplit的getSplits方法的流程  以及 hadoop作业分片处理以及任务本地性分析(源码分析第一篇)

分析前先介绍一下:

( 这里要注意下, Block 的 hosts 和 Split 的 hosts 不一样, Split 的 hosts 是通过 Split 的 hosts 按一定方法生成的, 如果一个 Block 对应一个 Split (一般情况下是这样的), 这时它们两个 hosts 是一样的. 如果不是一对一( Split > block), 则 Split 需要按一定方法选择 hosts .

Split 和 MapTask 是一一对应的, 一个 Split 对应一个 MapTask. 所以本地性是跟 Split 的 hosts 相关的.

BlocksMap存储 Block 与 BlockInfo 的映射关系, Block 中主要包含3项: long blockId;  // 数据块的唯一标识,即数据块的ID号. long numBytes;  // 数据块包含的文件数据大小. long generationStamp;  // 数据块的版本号,或数据块的时间戳.   BlockInfo( 在 Hadoop-2.7.3 中是 BlockInfoContiguous) 包含所以副本所在主机名.  )

开始分析: ( 这里是 hadoop-2.7.3-src )

以WordCount开始: org.apache.hadoop.examples.WordCount.main() 内部调用 org.apache.hadoop.mapreduce.Job.waitForCompletion(boolean)

// 该段代码在 org.apache.hadoop.examples.WordCount

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); //指定作业执行规范 , Configuration:map/reduce的j配置类,向hadoop框架描述map-reduce执行的工作
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count"); //指定job名称,及运行对象
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class); //为job设置Mapper类
job.setCombinerClass(IntSumReducer.class); //为job设置Combiner类
job.setReducerClass(IntSumReducer.class); //为job设置Reducer类
job.setOutputKeyClass(Text.class); //为job的输出数据设置Key类
job.setOutputValueClass(IntWritable.class); //为job输出设置value类
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i])); //为job设置输入路径, org.apache.hadoop.mapreduce.lib.input.FileInputFormat
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1])); //为job设置输出路径
System.exit(job.waitForCompletion(true) ? 0 : 1); //运行job, 调用 Job.waitForCompletion()
}

WordCount

在 Job.waitForCompletion() 函数内部会调用 Job 本类的方法 submit(), 在 submit() 内部接着调用 org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(Job , Cluster)

// 该段代码在 org.apache.hadoop.mapreduce.Job 中

// ......

/**
* Submit the job to the cluster and wait for it to finish.
* @param verbose print the progress to the user
* @return true if the job succeeded
* @throws IOException thrown if the communication with the
* <code>JobTracker</code> is lost
*/
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {
submit(); // 调用本类的 submit()
}
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
} // ...... /**
* Submit the job to the cluster and return immediately.
* @throws IOException
*/
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster); //
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}

Job

在 JobSubmitter.submitJobInternal(Job , Cluster) 函数内部调用本类的 writeSplits(Job ,Path )  为job创建分片; 接着 writeSplits(Job ,Path ) 方法内部会调用本类的 (1) writeNewSplits(JobContext , Path ) { Hadoop2.0 会调用这个,新版的API}和 (2) writeOldSplits(JobConf , Path ) { 这个是旧版的 API }; 在 JobSubmitter.writeNewSplits(JobContext , Path ) 方法内部会调用抽象类 org.apache.hadoop.mapreduce.InputFormat.getSplites(JobContext ), 计算job的输入文件的逻辑分片集合; 而在 JobSubmitter.writeOldSplits(JobContext , Path ) 方法内部会调用抽象类 org.apache.hadoop.mapred.InputFormat.getSplites(JobContext , int ), 计算job的输入文件的逻辑分片集合.

// 该段代码在 org.apache.hadoop.mapreduce.JobSubmitter 中

// ......

/**
* Internal method for submitting jobs to the system.
*
* <p>The job submission process involves:
* <ol>
* <li>
* Checking the input and output specifications of the job.
* </li>
* <li>
* Computing the {@link InputSplit}s for the job.
* </li>
* <li>
* Setup the requisite accounting information for the
* {@link DistributedCache} of the job, if necessary.
* </li>
* <li>
* Copying the job's jar and configuration to the map-reduce system
* directory on the distributed file-system.
* </li>
* <li>
* Submitting the job to the <code>JobTracker</code> and optionally
* monitoring it's status.
* </li>
* </ol></p>
* @param job the configuration to submit
* @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs
checkSpecs(job); Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
} copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
int maps = writeSplits(job, submitJobDir); // 为job创建分片
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
} // Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
} // Write job file to submit dir
writeConf(conf, submitJobFile); //
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); // 提交 job
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true); }
}
} // ...... private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
JobConf jConf = (JobConf)job.getConfiguration();
int maps;
if (jConf.getUseNewMapper()) {
maps = writeNewSplits(job, jobSubmitDir);
} else {
maps = writeOldSplits(jConf, jobSubmitDir);
}
return maps;
} // ...... @SuppressWarnings("unchecked")
private <T extends InputSplit>
int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = job.getConfiguration();
InputFormat<?, ?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), conf); List<InputSplit> splits = input.getSplits(job);
T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]); // sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new SplitComparator());
JobSplitWriter.createSplitFiles(jobSubmitDir, conf,
jobSubmitDir.getFileSystem(conf), array);
return array.length;
} // ...... //method to write splits for old api mapper.
private int writeOldSplits(JobConf job, Path jobSubmitDir)
throws IOException {
org.apache.hadoop.mapred.InputSplit[] splits =
job.getInputFormat().getSplits(job, job.getNumMapTasks());
// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
public int compare(org.apache.hadoop.mapred.InputSplit a,
org.apache.hadoop.mapred.InputSplit b) {
try {
long left = a.getLength();
long right = b.getLength();
if (left == right) {
return 0;
} else if (left < right) {
return 1;
} else {
return -1;
}
} catch (IOException ie) {
throw new RuntimeException("Problem getting input split size", ie);
}
}
});
JobSplitWriter.createSplitFiles(jobSubmitDir, job,
jobSubmitDir.getFileSystem(job), splits);
return splits.length;
}

JobSubmitter

(1) 在抽象类 org.apache.hadoop.mapreduce.InputFormat.getSplites(JobContext ) 方法,这里实际调用的是实现类 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplites(JobContext ), ( 这里 FileInputFormat 有两个相同的, 分别是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 和 org.apache.hadoop.mapred.FileInputFormat , 我们选择org.apache.hadoop.mapreduce.lib.input.FileInputFormat 有以下几点原因: 首先, org.apache.hadoop.mapred.FileInputFormat 类是抽象类 InputFormat 的实现类; 其次, WordCount中的FileInputFormat 就是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.)

我们介绍一个概念, 即新旧 MapReduce API , 从0.20.0版本开始, Hadoop 同时提供了新旧两套 MapReduce API. 新 API 在旧 API 基础上进行了封装,使得其在扩展性和易用性方面更好. 旧版 API 放在 org.apache.hadoop.mapred 包中, 而新版 API 则放在 org.apache.hadoop.mapreduce 包及其子包中.

 // 该段代码是在 org.apache.hadoop.mapreduce.InputFormat 中

  /**
* Logically split the set of input files for the job.
*
* <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
* for processing.</p>
*
* <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
* input files are not physically split into chunks. For e.g. a split could
* be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
* also creates the {@link RecordReader} to read the {@link InputSplit}.
*
* @param context job configuration.
* @return an array of {@link InputSplit}s for the job.
*/
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;

在 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplites(JobContext ) 方法中,

(2) 在抽象类 org.apache.hadoop.mapred.InputFormat.getSplites(JobContext ,int ) 方法,这里实际调用的是实现类 org.apache.hadoop.mapred.FileInputFormat.getSplites(JobContext ,int ) ,  这里  org.apache.hadoop.mapred.FileInputFormat 类是抽象类 InputFormat 的实现类. 具体参考 FileInputFormat类中split切分算法和host选择算法介绍  .