Hadoop作业的一生

时间:2021-10-18 08:02:00

    一个简单的Hadoop作业主要实现两个类,map和reduce,然后进行配置之后就可以使用JobClient提交给JobTracker了,JobClient提交之前会先把该作业映射成几个任务,JobTracker会把这些任务分发到请求任务的TaskTracker上,每个TaskTracker属于不同主机,从而实现并行计算。

    以下代码分为几个部分,作业提交,作业初始化为多个任务,请求与分发作业(涉及调度),作业执行

一 Job提交至JobTracker

    先看一个Hadoop作业:

public class WordCount {

public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// StringTokenizer itr = new StringTokenizer(value.toString());
String line = value.toString();
String[] line1 = line.split("\\W");
for (String line2 : line1) {
String[] words = line2.split(" ");
for (String wordend : words) {
word.set(wordend);
context.write(word, one);
}
}
// while (itr.hasMoreTokens()) {
// word.set(itr.nextToken());
// context.write(word, one);
// }
}
}

public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
try {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.out.println("job start!");
System.exit(job.waitForCompletion(true) ? 0 : 1);
System.out.println("completed!");
} catch (Exception e) {
e.printStackTrace();
}

}
}
 

 

1 Job.waitForCompletion

在配置好作业后,我们通过job.waitForCompletion提交作业,从这开始

  public boolean waitForCompletion(boolean verbose                                   ) throws IOException, InterruptedException,                                            ClassNotFoundException {    if (state == JobState.DEFINE) {      submit();    }    if (verbose) {      jobClient.monitorAndPrintJob(conf, info);    } else {      info.waitForCompletion();    }    return isSuccessful();  }

 

  public void submit() throws IOException, InterruptedException, 
ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
info = jobClient.submitJobInternal(conf);
super.setJobID(info.getID());
state = JobState.RUNNING;
}

由Job的构造方法知jobClient是JobClient的一个实例

  public Job(Configuration conf) throws IOException {
super(conf, null);
jobClient = new JobClient((JobConf) getConfiguration());
}


 

2 JobClient.submitJobInternal

  public 
RunningJob submitJobInternal(JobConf job
) throws FileNotFoundException,
ClassNotFoundException,
InterruptedException,
IOException {
/*
* configure the command line options correctly on the submitting dfs
*/

JobID jobId = jobSubmitClient.getNewJobId();
Path submitJobDir = new Path(getSystemDir(), jobId.toString());
Path submitJarFile = new Path(submitJobDir, "job.jar");
Path submitSplitFile = new Path(submitJobDir, "job.split");
configureCommandLineOptions(job, submitJobDir, submitJarFile);
Path submitJobFile = new Path(submitJobDir, "job.xml");
int reduces = job.getNumReduceTasks();
JobContext context = new JobContext(job, jobId);

// Check the output specification
if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
ReflectionUtils.newInstance(context.getOutputFormatClass(), job);
output.checkOutputSpecs(context);
} else {
job.getOutputFormat().checkOutputSpecs(fs, job);
}

// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
int maps;
if (job.getUseNewMapper()) {
maps = writeNewSplits(context, submitSplitFile);
} else {
maps = writeOldSplits(job, submitSplitFile);
}
job.set("mapred.job.split.file", submitSplitFile.toString());
job.setNumMapTasks(maps);

// Write job file to JobTracker's fs
FSDataOutputStream out =
FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));

try {
job.writeXml(out);
} finally {
out.close();
}

//
// Now, actually submit the job (using the submit name)
//
JobStatus status = jobSubmitClient.submitJob(jobId);
if (status != null) {
return new NetworkedJob(status);
} else {
throw new IOException("Could not launch job");
}
}

下面摘取部分代码:

    int maps;
if (job.getUseNewMapper()) {
maps = writeNewSplits(context, submitSplitFile);
} else {
maps = writeOldSplits(job, submitSplitFile);
}
job.set("mapred.job.split.file", submitSplitFile.toString());
job.setNumMapTasks(maps);

以上代码拷贝job.split文件并得到map个数。

 

 FSDataOutputStream out = 
FileSystem.create(fs, submitJobFile,
new FsPermission(JOB_FILE_PERMISSION));

try {
job.writeXml(out);
} finally {
out.close();
}

以上代码将Configuration里的配置项写入到特定目录下的job.xml文件

 

configureCommandLineOptions(job, submitJobDir, submitJarFile);


以上调用拷贝了job.jar文件。

 

以上工作都由jobClient完成,最后调用JobTracker的方法提交作业

JobID jobId = jobSubmitClient.getNewJobId();
JobStatus status = jobSubmitClient.submitJob(jobId);

 

jobSubmitClient是什么?他是JobClient的一个属性,

private JobSubmissionProtocol jobSubmitClient;

在JobClient的构造方法中调用了init方法,init方法初始化了该属性

  public void init(JobConf conf) throws IOException {
String tracker = conf.get("mapred.job.tracker", "local");
if ("local".equals(tracker)) {
this.jobSubmitClient = new LocalJobRunner(conf);
} else {
this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
}
}

 

createRPCProxy通过hadoop自己实现的远程过程调用机制获得一个JobSubmissionProtocol实例

  private JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
Configuration conf) throws IOException {
return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
JobSubmissionProtocol.versionID, addr, getUGI(conf), conf,
NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
}

 

注 JobTracker实现了JobSubmissionProtocol接口


3 JobTracker.submitJob

它主要是创建JobInprogress类并调用addJob

JobInProgress job = new JobInProgress(jobId, this, this.conf);
return addJob(jobId, job); 


4 addJob()

/**
* Adds a job to the jobtracker. Make sure that the checks are inplace before
* adding a job. This is the core job submission logic
* @param jobId The id for the job submitted which needs to be added
*/
private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
totalSubmissions++;

synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
for (JobInProgressListener listener : jobInProgressListeners) {
try {
listener.jobAdded(job);
} catch (IOException ioe) {
LOG.warn("Failed to add and so skipping the job : "
+ job.getJobID() + ". Exception : " + ioe);
}
}
}
}
myInstrumentation.submitJob(job.getJobConf(), jobId);
return job.getStatus();
}

jobInProgressListeners是JobTracker上的一个监听器列表,每个监听器都维护一个作业类列表。listener.jobAdded(job);把作业添加到自己的列表中

 

 

作业提交到此完毕,以下介绍初始化阶段

二 作业初始化为多个任务

1 JobTracker和TaskScheduler的关系

JobTracker启动代码

  public static void main(String argv[]
) throws IOException, InterruptedException {
StringUtils.startupShutdownMessage(JobTracker.class, argv, LOG);
if (argv.length != 0) {
System.out.println("usage: JobTracker");
System.exit(-1);
}

try {
JobTracker tracker = startTracker(new JobConf());
tracker.offerService();
} catch (Throwable e) {
LOG.fatal(StringUtils.stringifyException(e));
System.exit(-1);
}
}


startTracker调用构造函数,并设置taskScheduler

  public static JobTracker startTracker(JobConf conf, String identifier) 
throws IOException, InterruptedException {
JobTracker result = null;
while (true) {
try {
result = new JobTracker(conf, identifier);
result.taskScheduler.setTaskTrackerManager(result);
break;
} catch (VersionMismatch e) {
throw e;
} catch (BindException e) {
throw e;
} catch (UnknownHostException e) {
throw e;
} catch (AccessControlException ace) {
// in case of jobtracker not having right access
// bail out
throw ace;
} catch (IOException e) {
LOG.warn("Error starting tracker: " +
StringUtils.stringifyException(e));
}
Thread.sleep(1000);
}
if (result != null) {
JobEndNotifier.startNotifier();
}
return result;
}


taskScheduler创建正是在JobTracker的构造函数中

    Class<? extends TaskScheduler> schedulerClass
= conf.getClass("mapred.jobtracker.taskScheduler",
JobQueueTaskScheduler.class, TaskScheduler.class);
taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);


注:以上代码说明TaskScheduler是可以在作业中配置的,一般使用 JobQueueTaskScheduler,它继承了TaskTracker

 

2 JobTracker通过TaskTracker初始化作业

在上一节中main函数调用了两个重要的函数,一个是startTracker,另一个是offerService,前一个完成JobTracker实例的创建及其属性的初始化,下面看第二个函数对于作业初始化的影响,跟作业初始化有关的代码是taskScheduler.start();

  public void offerService() throws InterruptedException, IOException {
// Prepare for recovery. This is done irrespective of the status of restart
// flag.
while (true) {
try {
recoveryManager.updateRestartCount();
break;
} catch (IOException ioe) {
LOG.warn("Failed to initialize recovery manager. ", ioe);
// wait for some time
Thread.sleep(FS_ACCESS_RETRY_PERIOD);
LOG.warn("Retrying...");
}
}

taskScheduler.start();

// Start the recovery after starting the scheduler
try {
recoveryManager.recover();
} catch (Throwable t) {
LOG.warn("Recovery manager crashed! Ignoring.", t);
}

this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
this.expireTrackersThread.start();
this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
this.retireJobsThread.start();
expireLaunchingTaskThread.start();

if (completedJobStatusStore.isActive()) {
completedJobsStoreThread = new Thread(completedJobStatusStore,
"completedjobsStore-housekeeper");
completedJobsStoreThread.start();
}

// start the inter-tracker server once the jt is ready
this.interTrackerServer.start();

synchronized (this) {
state = State.RUNNING;
}
LOG.info("Starting RUNNING");

this.interTrackerServer.join();
LOG.info("Stopped interTrackerServer");
}

 

taskScheduler一般是JobQueueTaskScheduler的实例,我们看其方法内容

  public synchronized void start() throws IOException {
super.start();
taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
eagerTaskInitializationListener.start();
taskTrackerManager.addJobInProgressListener(
eagerTaskInitializationListener);
}

涉及到两个监听器,其初始化比较简单,略过,跟作业初始化有关的代码为eagerTaskInitializationListener.start();

 

EagerTaskInitializationListener.start();方法如下

  public void start() throws IOException {
this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
jobInitManagerThread.setDaemon(true);
this.jobInitManagerThread.start();
}


jobInitManager是EagerTaskInitializationListener的内部类JobInitManager创建的一个实例,该类实现Runnbale接口,它不断循环处理工作队列

  class JobInitManager implements Runnable {

public void run() {
JobInProgress job = null;
while (true) {
try {
synchronized (jobInitQueue) {
while (jobInitQueue.isEmpty()) {
jobInitQueue.wait();
}
job = jobInitQueue.remove(0);
}
threadPool.execute(new InitJob(job));
} catch (InterruptedException t) {
LOG.info("JobInitManagerThread interrupted.");
break;
}
}
LOG.info("Shutting down thread pool");
threadPool.shutdownNow();
}
}

 

InitJob是另一个内部类,其run方法调用ttm.initJob; ttm

  class InitJob implements Runnable {

private JobInProgress job;

public InitJob(JobInProgress job) {
this.job = job;
}

public void run() {
ttm.initJob(job);
}
}


ttm 是什么?

  public void setTaskTrackerManager(TaskTrackerManager ttm) {
this.ttm = ttm;
}


该方法正是被JobTracker在startTracker中调用(参看上一节)

        result.taskScheduler.setTaskTrackerManager(result);


3 JobTracker.initJob函数

该函数最终调用的是job.initTasks();

  public void initJob(JobInProgress job) {
if (null == job) {
LOG.info("Init on null job is not valid");
return;
}

try {
JobStatus prevStatus = (JobStatus)job.getStatus().clone();
LOG.info("Initializing " + job.getJobID());
job.initTasks();
// Inform the listeners if the job state has changed
// Note : that the job will be in PREP state.
JobStatus newStatus = (JobStatus)job.getStatus().clone();
if (prevStatus.getRunState() != newStatus.getRunState()) {
JobStatusChangeEvent event =
new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
newStatus);
synchronized (JobTracker.this) {
updateJobInProgressListeners(event);
}
}
} catch (KillInterruptedException kie) {
// If job was killed during initialization, job state will be KILLED
LOG.error("Job initialization interrupted:\n" +
StringUtils.stringifyException(kie));
killJob(job);
} catch (Throwable t) {
// If the job initialization is failed, job state will be FAILED
LOG.error("Job initialization failed:\n" +
StringUtils.stringifyException(t));
failJob(job);
}
}

 

JobInProgress.initTasks()函数

  /**
* Construct the splits, etc. This is invoked from an async
* thread so that split-computation doesn't block anyone.
*/
public synchronized void initTasks()
throws IOException, KillInterruptedException {
if (tasksInited.get() || isComplete()) {
return;
}
synchronized(jobInitKillStatus){
if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
return;
}
jobInitKillStatus.initStarted = true;
}

LOG.info("Initializing " + jobId);

// log job info
JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(),
this.startTime, hasRestarted());
// log the job priority
setPriority(this.priority);

//
// read input splits and create a map per a split
//
String jobFile = profile.getJobFile();

Path sysDir = new Path(this.jobtracker.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
DataInputStream splitFile =
fs.open(new Path(conf.get("mapred.job.split.file")));
JobClient.RawSplit[] splits;
try {
splits = JobClient.readSplitFile(splitFile);
} finally {
splitFile.close();
}
numMapTasks = splits.length;


// if the number of splits is larger than a configured value
// then fail the job.
int maxTasks = jobtracker.getMaxTasksPerJob();
if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
throw new IOException(
"The number of tasks for this job " +
(numMapTasks + numReduceTasks) +
" exceeds the configured limit " + maxTasks);
}
jobtracker.getInstrumentation().addWaiting(
getJobID(), numMapTasks + numReduceTasks);

maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i);
}
LOG.info("Input size for job " + jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);
if (numMapTasks > 0) {
nonRunningMapCache = createCache(splits, maxLevel);
}

// set the launch time
this.launchTime = System.currentTimeMillis();

//
// Create reduce tasks
//
this.reduces = new TaskInProgress[numReduceTasks];
for (int i = 0; i < numReduceTasks; i++) {
reduces[i] = new TaskInProgress(jobId, jobFile,
numMapTasks, i,
jobtracker, conf, this);
nonRunningReduces.add(reduces[i]);
}

// Calculate the minimum number of maps to be complete before
// we should start scheduling reduces
completedMapsForReduceSlowstart =
(int)Math.ceil(
(conf.getFloat("mapred.reduce.slowstart.completed.maps",
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
numMapTasks));

// create cleanup two cleanup tips, one map and one reduce.
cleanup = new TaskInProgress[2];

// cleanup map tip. This map doesn't use any splits. Just assign an empty
// split.
JobClient.RawSplit emptySplit = new JobClient.RawSplit();
cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks);
cleanup[0].setJobCleanupTask();

// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this);
cleanup[1].setJobCleanupTask();

// create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];

// setup map tip. This map doesn't use any split. Just assign an empty
// split.
setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
jobtracker, conf, this, numMapTasks + 1 );
setup[0].setJobSetupTask();

// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this);
setup[1].setJobSetupTask();

synchronized(jobInitKillStatus){
jobInitKillStatus.initDone = true;
if(jobInitKillStatus.killed) {
throw new KillInterruptedException("Job " + jobId + " killed in init");
}
}

tasksInited.set(true);
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
numMapTasks, numReduceTasks);
}


三 请求与分发任务

1 JobTracker和TaskTracker之间的通信

JobTracker和TaskTracker之间仍然使用远程过程调用通信

 

JobTracker的构造函数中首先构建服务器

this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);

interTrackerServer为Server类型;然后在offerService方法中启动服务

    this.interTrackerServer.start();

 

TaskTracker含有一个属性 jobClient,它正是RPC通信的客户端,它在TaskTracker的initialize方法里被初始化,而initialize方法在构造函数中被调用,initialize的部分代码如下,JobTracker实现了InterTrackerProtocol,jobClient是JobTracker服务的客户端,它可以调用JobTracker的方法。

    this.jobClient = (InterTrackerProtocol) 
RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
jobTrackAddr, this.fConf);

 

下面看jobClient如何使用

TaskTracker启动后实例化一个TaskTracker并马上调用一个run方法

  public static void main(String argv[]) throws Exception {
StringUtils.startupShutdownMessage(TaskTracker.class, argv, LOG);
if (argv.length != 0) {
System.out.println("usage: TaskTracker");
System.exit(-1);
}
try {
JobConf conf=new JobConf();
// enable the server to track time spent waiting on locks
ReflectionUtils.setContentionTracing
(conf.getBoolean("tasktracker.contention.tracking", false));
new TaskTracker(conf).run();
} catch (Throwable e) {
LOG.error("Can not start task tracker because "+
StringUtils.stringifyException(e));
System.exit(-1);
}
}

 

run方法里面循环调用offerService

  public void run() {
try {
startCleanupThreads();
boolean denied = false;
while (running && !shuttingDown && !denied) {
boolean staleState = false;
try {
// This while-loop attempts reconnects if we get network errors
while (running && !staleState && !shuttingDown && !denied) {
try {
State osState = offerService();
if (osState == State.STALE) {
staleState = true;
} else if (osState == State.DENIED) {
denied = true;
}
} catch (Exception ex) {
if (!shuttingDown) {
LOG.info("Lost connection to JobTracker [" +
jobTrackAddr + "]. Retrying...", ex);
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
}
}
}
}
} finally {
close();
}
if (shuttingDown) { return; }
LOG.warn("Reinitializing local state");
initialize();
}
if (denied) {
shutdown();
}
} catch (IOException iex) {
LOG.error("Got fatal exception while reinitializing TaskTracker: " +
StringUtils.stringifyException(iex));
return;
}
}


offerService也是在循环里面不断调用transmitHeartBeat方法,该方法返回心跳信息HeartBeatResponse,里面封装了服务器派发的任务,并把任务添加到本地任务列表里

  State offerService() throws Exception {
long lastHeartbeat = 0;

while (running && !shuttingDown) {
try {
long now = System.currentTimeMillis();

long waitTime = heartbeatInterval - (now - lastHeartbeat);
if (waitTime > 0) {
// sleeps for the wait time
Thread.sleep(waitTime);
}

// If the TaskTracker is just starting up:
// 1. Verify the buildVersion
// 2. Get the system directory & filesystem
if(justInited) {
String jobTrackerBV = jobClient.getBuildVersion();
if(!VersionInfo.getBuildVersion().equals(jobTrackerBV)) {
String msg = "Shutting down. Incompatible buildVersion." +
"\nJobTracker's: " + jobTrackerBV +
"\nTaskTracker's: "+ VersionInfo.getBuildVersion();
LOG.error(msg);
try {
jobClient.reportTaskTrackerError(taskTrackerName, null, msg);
} catch(Exception e ) {
LOG.info("Problem reporting to jobtracker: " + e);
}
return State.DENIED;
}

String dir = jobClient.getSystemDir();
if (dir == null) {
throw new IOException("Failed to get system directory");
}
systemDirectory = new Path(dir);
systemFS = systemDirectory.getFileSystem(fConf);
}

// Send the heartbeat and process the jobtracker's directives
HeartbeatResponse heartbeatResponse = transmitHeartBeat(now);

// Note the time when the heartbeat returned, use this to decide when to send the
// next heartbeat
lastHeartbeat = System.currentTimeMillis();


// Check if the map-event list needs purging
Set<JobID> jobs = heartbeatResponse.getRecoveredJobs();
if (jobs.size() > 0) {
synchronized (this) {
// purge the local map events list
for (JobID job : jobs) {
RunningJob rjob;
synchronized (runningJobs) {
rjob = runningJobs.get(job);
if (rjob != null) {
synchronized (rjob) {
FetchStatus f = rjob.getFetchStatus();
if (f != null) {
f.reset();
}
}
}
}
}

// Mark the reducers in shuffle for rollback
synchronized (shouldReset) {
for (Map.Entry<TaskAttemptID, TaskInProgress> entry
: runningTasks.entrySet()) {
if (entry.getValue().getStatus().getPhase() == Phase.SHUFFLE) {
this.shouldReset.add(entry.getKey());
}
}
}
}
}

TaskTrackerAction[] actions = heartbeatResponse.getActions();
if(LOG.isDebugEnabled()) {
LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
heartbeatResponse.getResponseId() + " and " +
((actions != null) ? actions.length : 0) + " actions");
}
if (reinitTaskTracker(actions)) {
return State.STALE;
}

// resetting heartbeat interval from the response.
heartbeatInterval = heartbeatResponse.getHeartbeatInterval();
justStarted = false;
justInited = false;
if (actions != null){
for(TaskTrackerAction action: actions) {
if (action instanceof LaunchTaskAction) {
addToTaskQueue((LaunchTaskAction)action);
} else if (action instanceof CommitTaskAction) {
CommitTaskAction commitAction = (CommitTaskAction)action;
if (!commitResponses.contains(commitAction.getTaskID())) {
LOG.info("Received commit task action for " +
commitAction.getTaskID());
commitResponses.add(commitAction.getTaskID());
}
} else {
tasksToCleanup.put(action);
}
}
}
markUnresponsiveTasks();
killOverflowingTasks();

//we've cleaned up, resume normal operation
if (!acceptNewTasks && isIdle()) {
acceptNewTasks=true;
}
//The check below may not be required every iteration but we are
//erring on the side of caution here. We have seen many cases where
//the call to jetty's getLocalPort() returns different values at
//different times. Being a real paranoid here.
checkJettyPort(server.getPort());
} catch (InterruptedException ie) {
LOG.info("Interrupted. Closing down.");
return State.INTERRUPTED;
} catch (DiskErrorException de) {
String msg = "Exiting task tracker for disk error:\n" +
StringUtils.stringifyException(de);
LOG.error(msg);
synchronized (this) {
jobClient.reportTaskTrackerError(taskTrackerName,
"DiskErrorException", msg);
}
return State.STALE;
} catch (RemoteException re) {
String reClass = re.getClassName();
if (DisallowedTaskTrackerException.class.getName().equals(reClass)) {
LOG.info("Tasktracker disallowed by JobTracker.");
return State.DENIED;
}
} catch (Exception except) {
String msg = "Caught exception: " +
StringUtils.stringifyException(except);
LOG.error(msg);
}
}

return State.NORMAL;
}

 

TaskTracker维护了两个TaskLauncher实例,mapLauncher和reduceLauncher,每个实例维护一个TaskInProgress队列,此处的TaskInProgress为TaskTracker的内部类(不同于JobInProgress用到的TaskInProgress类)

  private void addToTaskQueue(LaunchTaskAction action) {
if (action.getTask().isMapTask()) {
mapLauncher.addToTaskQueue(action);
} else {
reduceLauncher.addToTaskQueue(action);
}
}

 

    public void addToTaskQueue(LaunchTaskAction action) {
synchronized (tasksToLaunch) {
TaskInProgress tip = registerTask(action, this);
tasksToLaunch.add(tip);
tasksToLaunch.notifyAll();
}
}

 

  private TaskInProgress registerTask(LaunchTaskAction action, 
TaskLauncher launcher) {
Task t = action.getTask();
LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
" task's state:" + t.getState());
TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
synchronized (this) {
tasks.put(t.getTaskID(), tip);
runningTasks.put(t.getTaskID(), tip);
boolean isMap = t.isMapTask();
if (isMap) {
mapTotal++;
} else {
reduceTotal++;
}
}
return tip;
}


 


transmitHeartResponse如何请求任务?

  private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
// Send Counters in the status once every COUNTER_UPDATE_INTERVAL
boolean sendCounters;
if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
sendCounters = true;
previousUpdate = now;
}
else {
sendCounters = false;
}

//
// Check if the last heartbeat got through...
// if so then build the heartbeat information for the JobTracker;
// else resend the previous status information.
//
if (status == null) {
synchronized (this) {
status = new TaskTrackerStatus(taskTrackerName, localHostname,
httpPort,
cloneAndResetRunningTaskStatuses(
sendCounters),
failures,
maxCurrentMapTasks,
maxCurrentReduceTasks);
}
} else {
LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
"' with reponseId '" + heartbeatResponseId);
}

//
// Check if we should ask for a new Task
//
boolean askForNewTask;
long localMinSpaceStart;
synchronized (this) {
askForNewTask = (status.countMapTasks() < maxCurrentMapTasks ||
status.countReduceTasks() < maxCurrentReduceTasks) &&
acceptNewTasks;
localMinSpaceStart = minSpaceStart;
}
if (askForNewTask) {
checkLocalDirs(fConf.getLocalDirs());
askForNewTask = enoughFreeSpace(localMinSpaceStart);
long freeDiskSpace = getFreeSpace();
long totVmem = getTotalVirtualMemoryOnTT();
long totPmem = getTotalPhysicalMemoryOnTT();

status.getResourceStatus().setAvailableSpace(freeDiskSpace);
status.getResourceStatus().setTotalVirtualMemory(totVmem);
status.getResourceStatus().setTotalPhysicalMemory(totPmem);
status.getResourceStatus().setMapSlotMemorySizeOnTT(
mapSlotMemorySizeOnTT);
status.getResourceStatus().setReduceSlotMemorySizeOnTT(
reduceSlotSizeMemoryOnTT);
}

//
// Xmit the heartbeat
//
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted,
justInited,
askForNewTask,
heartbeatResponseId);

//
// The heartbeat got through successfully!
//
heartbeatResponseId = heartbeatResponse.getResponseId();

synchronized (this) {
for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
!taskStatus.inTaskCleanupPhase()) {
if (taskStatus.getIsMap()) {
mapTotal--;
} else {
reduceTotal--;
}
try {
myInstrumentation.completeTask(taskStatus.getTaskID());
} catch (MetricsException me) {
LOG.warn("Caught: " + StringUtils.stringifyException(me));
}
runningTasks.remove(taskStatus.getTaskID());
}
}

// Clear transient status information which should only
// be sent once to the JobTracker
for (TaskInProgress tip: runningTasks.values()) {
tip.getStatus().clearStatus();
}
}

// Force a rebuild of 'status' on the next iteration
status = null;

return heartbeatResponse;
}

我们看到最终调用了客户端的heartBeat方法,这相当于执行JobTracker的heartBeat,我们看JobTracker的heartBeat

 

2 JobTracker分发作业

heartBeart方法

  public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
boolean restarted,
boolean initialContact,
boolean acceptNewTasks,
short responseId)
throws IOException {
LOG.debug("Got heartbeat from: " + status.getTrackerName() +
" (restarted: " + restarted +
" initialContact: " + initialContact +
" acceptNewTasks: " + acceptNewTasks + ")" +
" with responseId: " + responseId);

// Make sure heartbeat is from a tasktracker allowed by the jobtracker.
if (!acceptTaskTracker(status)) {
throw new DisallowedTaskTrackerException(status);
}

// First check if the last heartbeat response got through
String trackerName = status.getTrackerName();
long now = System.currentTimeMillis();
boolean isBlacklisted = false;
if (restarted) {
faultyTrackers.markTrackerHealthy(status.getHost());
} else {
isBlacklisted =
faultyTrackers.shouldAssignTasksToTracker(status.getHost(), now);
}

HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);
boolean addRestartInfo = false;

if (initialContact != true) {
// If this isn't the 'initial contact' from the tasktracker,
// there is something seriously wrong if the JobTracker has
// no record of the 'previous heartbeat'; if so, ask the
// tasktracker to re-initialize itself.
if (prevHeartbeatResponse == null) {
// This is the first heartbeat from the old tracker to the newly
// started JobTracker
if (hasRestarted()) {
addRestartInfo = true;
// inform the recovery manager about this tracker joining back
recoveryManager.unMarkTracker(trackerName);
} else {
// Jobtracker might have restarted but no recovery is needed
// otherwise this code should not be reached
LOG.warn("Serious problem, cannot find record of 'previous' " +
"heartbeat for '" + trackerName +
"'; reinitializing the tasktracker");
return new HeartbeatResponse(responseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}

} else {

// It is completely safe to not process a 'duplicate' heartbeat from a
// {@link TaskTracker} since it resends the heartbeat when rpcs are
// lost see {@link TaskTracker.transmitHeartbeat()};
// acknowledge it by re-sending the previous response to let the
// {@link TaskTracker} go forward.
if (prevHeartbeatResponse.getResponseId() != responseId) {
LOG.info("Ignoring 'duplicate' heartbeat from '" +
trackerName + "'; resending the previous 'lost' response");
return prevHeartbeatResponse;
}
}
}

// Process this heartbeat
short newResponseId = (short)(responseId + 1);
status.setLastSeen(now);
if (!processHeartbeat(status, initialContact)) {
if (prevHeartbeatResponse != null) {
trackerToHeartbeatResponseMap.remove(trackerName);
}
return new HeartbeatResponse(newResponseId,
new TaskTrackerAction[] {new ReinitTrackerAction()});
}

// Initialize the response to be sent for the heartbeat
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();

// Check for new tasks to be executed on the tasktracker
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
} else {
List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
if (tasks == null ) {
tasks = taskScheduler.assignTasks(taskTrackerStatus);
}
if (tasks != null) {
for (Task task : tasks) {
expireLaunchingTasks.addNewTask(task.getTaskID());
LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskID());
actions.add(new LaunchTaskAction(task));
}
}
}
}

// Check for tasks to be killed
List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
if (killTasksList != null) {
actions.addAll(killTasksList);
}

// Check for jobs to be killed/cleanedup
List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
if (killJobsList != null) {
actions.addAll(killJobsList);
}

// Check for tasks whose outputs can be saved
List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
if (commitTasksList != null) {
actions.addAll(commitTasksList);
}

// calculate next heartbeat interval and put in heartbeat response
int nextInterval = getNextHeartbeatInterval();
response.setHeartbeatInterval(nextInterval);
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));

// check if the restart info is req
if (addRestartInfo) {
response.setRecoveredJobs(recoveryManager.getJobsToRecover());
}

// Update the trackerToHeartbeatResponseMap
trackerToHeartbeatResponseMap.put(trackerName, response);

// Done processing the hearbeat, now remove 'marked' tasks
removeMarkedTasks(trackerName);

return response;
}


 

 

其中tasks = taskScheduler.assignTasks(taskTrackerStatus);最终是调用了TaskScheduler(JobQueueTaskScheduler)的调度方法,

  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
throws IOException {

ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
final int numTaskTrackers = clusterStatus.getTaskTrackers();
final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();

Collection<JobInProgress> jobQueue =
jobQueueJobInProgressListener.getJobQueue();

//
// Get map + reduce counts for the current tracker.
//
final int trackerMapCapacity = taskTracker.getMaxMapTasks();
final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
final int trackerRunningMaps = taskTracker.countMapTasks();
final int trackerRunningReduces = taskTracker.countReduceTasks();

// Assigned tasks
List<Task> assignedTasks = new ArrayList<Task>();

//
// Compute (running + pending) map and reduce task numbers across pool
//
int remainingReduceLoad = 0;
int remainingMapLoad = 0;
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() == JobStatus.RUNNING) {
remainingMapLoad += (job.desiredMaps() - job.finishedMaps());
if (job.scheduleReduces()) {
remainingReduceLoad +=
(job.desiredReduces() - job.finishedReduces());
}
}
}
}

// Compute the 'load factor' for maps and reduces
double mapLoadFactor = 0.0;
if (clusterMapCapacity > 0) {
mapLoadFactor = (double)remainingMapLoad / clusterMapCapacity;
}
double reduceLoadFactor = 0.0;
if (clusterReduceCapacity > 0) {
reduceLoadFactor = (double)remainingReduceLoad / clusterReduceCapacity;
}

//
// In the below steps, we allocate first map tasks (if appropriate),
// and then reduce tasks if appropriate. We go through all jobs
// in order of job arrival; jobs only get serviced if their
// predecessors are serviced, too.
//

//
// We assign tasks to the current taskTracker if the given machine
// has a workload that's less than the maximum load of that kind of
// task.
// However, if the cluster is close to getting loaded i.e. we don't
// have enough _padding_ for speculative executions etc., we only
// schedule the "highest priority" task i.e. the task from the job
// with the highest priority.
//

final int trackerCurrentMapCapacity =
Math.min((int)Math.ceil(mapLoadFactor * trackerMapCapacity),
trackerMapCapacity);
int availableMapSlots = trackerCurrentMapCapacity - trackerRunningMaps;
boolean exceededMapPadding = false;
if (availableMapSlots > 0) {
exceededMapPadding =
exceededPadding(true, clusterStatus, trackerMapCapacity);
}

int numLocalMaps = 0;
int numNonLocalMaps = 0;
scheduleMaps:
for (int i=0; i < availableMapSlots; ++i) {
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}

Task t = null;

// Try to schedule a node-local or rack-local Map task
t =
job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());
if (t != null) {
assignedTasks.add(t);
++numLocalMaps;

// Don't assign map tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededMapPadding) {
break scheduleMaps;
}

// Try all jobs again for the next Map task
break;
}

// Try to schedule a node-local or rack-local Map task
t =
job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts());

if (t != null) {
assignedTasks.add(t);
++numNonLocalMaps;

// We assign at most 1 off-switch or speculative task
// This is to prevent TaskTrackers from stealing local-tasks
// from other TaskTrackers.
break scheduleMaps;
}
}
}
}
int assignedMaps = assignedTasks.size();

//
// Same thing, but for reduce tasks
// However we _never_ assign more than 1 reduce task per heartbeat
//
final int trackerCurrentReduceCapacity =
Math.min((int)Math.ceil(reduceLoadFactor * trackerReduceCapacity),
trackerReduceCapacity);
final int availableReduceSlots =
Math.min((trackerCurrentReduceCapacity - trackerRunningReduces), 1);
boolean exceededReducePadding = false;
if (availableReduceSlots > 0) {
exceededReducePadding = exceededPadding(false, clusterStatus,
trackerReduceCapacity);
synchronized (jobQueue) {
for (JobInProgress job : jobQueue) {
if (job.getStatus().getRunState() != JobStatus.RUNNING ||
job.numReduceTasks == 0) {
continue;
}

Task t =
job.obtainNewReduceTask(taskTracker, numTaskTrackers,
taskTrackerManager.getNumberOfUniqueHosts()
);
if (t != null) {
assignedTasks.add(t);
break;
}

// Don't assign reduce tasks to the hilt!
// Leave some free slots in the cluster for future task-failures,
// speculative tasks etc. beyond the highest priority job
if (exceededReducePadding) {
break;
}
}
}
}

if (LOG.isDebugEnabled()) {
LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
"[" + mapLoadFactor + ", " + trackerMapCapacity + ", " +
trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" +
(trackerCurrentMapCapacity - trackerRunningMaps) + ", " +
assignedMaps + " (" + numLocalMaps + ", " + numNonLocalMaps +
")] [" + reduceLoadFactor + ", " + trackerReduceCapacity + ", " +
trackerCurrentReduceCapacity + "," + trackerRunningReduces +
"] -> [" + (trackerCurrentReduceCapacity - trackerRunningReduces) +
", " + (assignedTasks.size()-assignedMaps) + "]");
}

return assignedTasks;
}


 

四 作业执行

上一节略过initialize方法,它在TaskTracker构造函数结束时被调用

  public TaskTracker(JobConf conf) throws IOException {
originalConf = conf;
maxCurrentMapTasks = conf.getInt(
"mapred.tasktracker.map.tasks.maximum", 2);
maxCurrentReduceTasks = conf.getInt(
"mapred.tasktracker.reduce.tasks.maximum", 2);
this.jobTrackAddr = JobTracker.getAddress(conf);
String infoAddr =
NetUtils.getServerAddress(conf,
"tasktracker.http.bindAddress",
"tasktracker.http.port",
"mapred.task.tracker.http.address");
InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
String httpBindAddress = infoSocAddr.getHostName();
int httpPort = infoSocAddr.getPort();
this.server = new HttpServer("task", httpBindAddress, httpPort,
httpPort == 0, conf);
workerThreads = conf.getInt("tasktracker.http.threads", 40);
this.shuffleServerMetrics = new ShuffleServerMetrics(conf);
server.setThreads(1, workerThreads);
// let the jsp pages get to the task tracker, config, and other relevant
// objects
FileSystem local = FileSystem.getLocal(conf);
this.localDirAllocator = new LocalDirAllocator("mapred.local.dir");
server.setAttribute("task.tracker", this);
server.setAttribute("local.file.system", local);
server.setAttribute("conf", conf);
server.setAttribute("log", LOG);
server.setAttribute("localDirAllocator", localDirAllocator);
server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
server.addInternalServlet("taskLog", "/tasklog", TaskLogServlet.class);
server.start();
this.httpPort = server.getPort();
checkJettyPort(httpPort);
initialize();
}


initialize完整函数如下,跟作业执行有关的代码在最后几行

  synchronized void initialize() throws IOException {
// use configured nameserver & interface to get local hostname
this.fConf = new JobConf(originalConf);
if (fConf.get("slave.host.name") != null) {
this.localHostname = fConf.get("slave.host.name");
}
if (localHostname == null) {
this.localHostname =
DNS.getDefaultHost
(fConf.get("mapred.tasktracker.dns.interface","default"),
fConf.get("mapred.tasktracker.dns.nameserver","default"));
}

//check local disk
checkLocalDirs(this.fConf.getLocalDirs());
fConf.deleteLocalFiles(SUBDIR);

// Clear out state tables
this.tasks.clear();
this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
this.runningJobs = new TreeMap<JobID, RunningJob>();
this.mapTotal = 0;
this.reduceTotal = 0;
this.acceptNewTasks = true;
this.status = null;

this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
//tweak the probe sample size (make it a function of numCopiers)
probe_sample_size = this.fConf.getInt("mapred.tasktracker.events.batchsize", 500);

Class<? extends TaskTrackerInstrumentation> metricsInst = getInstrumentationClass(fConf);
try {
java.lang.reflect.Constructor<? extends TaskTrackerInstrumentation> c =
metricsInst.getConstructor(new Class[] {TaskTracker.class} );
this.myInstrumentation = c.newInstance(this);
} catch(Exception e) {
//Reflection can throw lots of exceptions -- handle them all by
//falling back on the default.
LOG.error("failed to initialize taskTracker metrics", e);
this.myInstrumentation = new TaskTrackerMetricsInst(this);
}

// bind address
String address =
NetUtils.getServerAddress(fConf,
"mapred.task.tracker.report.bindAddress",
"mapred.task.tracker.report.port",
"mapred.task.tracker.report.address");
InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
String bindAddress = socAddr.getHostName();
int tmpPort = socAddr.getPort();

this.jvmManager = new JvmManager(this);

// Set service-level authorization security policy
if (this.fConf.getBoolean(
ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
PolicyProvider policyProvider =
(PolicyProvider)(ReflectionUtils.newInstance(
this.fConf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG,
MapReducePolicyProvider.class, PolicyProvider.class),
this.fConf));
SecurityUtil.setPolicy(new ConfiguredPolicy(this.fConf, policyProvider));
}

// RPC initialization
int max = maxCurrentMapTasks > maxCurrentReduceTasks ?
maxCurrentMapTasks : maxCurrentReduceTasks;
//set the num handlers to max*2 since canCommit may wait for the duration
//of a heartbeat RPC
this.taskReportServer =
RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf);
this.taskReportServer.start();

// get the assigned address
this.taskReportAddress = taskReportServer.getListenerAddress();
this.fConf.set("mapred.task.tracker.report.address",
taskReportAddress.getHostName() + ":" + taskReportAddress.getPort());
LOG.info("TaskTracker up at: " + this.taskReportAddress);

this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress;
LOG.info("Starting tracker " + taskTrackerName);

// Clear out temporary files that might be lying around
DistributedCache.purgeCache(this.fConf);
cleanupStorage();

this.jobClient = (InterTrackerProtocol)
RPC.waitForProxy(InterTrackerProtocol.class,
InterTrackerProtocol.versionID,
jobTrackAddr, this.fConf);
this.justInited = true;
this.running = true;
// start the thread that will fetch map task completion events
this.mapEventsFetcher = new MapEventsFetcherThread();
mapEventsFetcher.setDaemon(true);
mapEventsFetcher.setName(
"Map-events fetcher for all reduce tasks " + "on " +
taskTrackerName);
mapEventsFetcher.start();

initializeMemoryManagement();

this.indexCache = new IndexCache(this.fConf);

mapLauncher = new TaskLauncher(maxCurrentMapTasks);
reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
mapLauncher.start();
reduceLauncher.start();
}


TaskLauncher是TaskTracker的一个内部类,继承了Thread,其run方法先等待任务的到来,然后等待空闲的slot,等两者兼备时选择合适状态的任务执行startNewTask(tip);

    public void run() {
while (!Thread.interrupted()) {
try {
TaskInProgress tip;
synchronized (tasksToLaunch) {
while (tasksToLaunch.isEmpty()) {
tasksToLaunch.wait();
}
//get the TIP
tip = tasksToLaunch.remove(0);
LOG.info("Trying to launch : " + tip.getTask().getTaskID());
}
//wait for a slot to run
synchronized (numFreeSlots) {
while (numFreeSlots.get() == 0) {
numFreeSlots.wait();
}
LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
" and trying to launch "+tip.getTask().getTaskID());
numFreeSlots.set(numFreeSlots.get() - 1);
assert (numFreeSlots.get() >= 0);
}
synchronized (tip) {
//to make sure that there is no kill task action for this
if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
//got killed externally while still in the launcher queue
addFreeSlot();
continue;
}
tip.slotTaken = true;
}
//got a free slot. launch the task
startNewTask(tip);
} catch (InterruptedException e) {
return; // ALL DONE
} catch (Throwable th) {
LOG.error("TaskLauncher error " +
StringUtils.stringifyException(th));
}
}
}
}


调用外部类的startNewTask

  private void startNewTask(TaskInProgress tip) {
try {
localizeJob(tip);
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
LOG.warn(msg);
tip.reportDiagnosticInfo(msg);
try {
tip.kill(true);
tip.cleanup(true);
} catch (IOException ie2) {
LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
StringUtils.stringifyException(ie2));
}

// Careful!
// This might not be an 'Exception' - don't handle 'Error' here!
if (e instanceof Error) {
throw ((Error) e);
}
}
}


localizeJob(tip);看起来不像执行程序的,但try 模块里只有这一句代码,我们看localizeJob的详细内容,可见主要是将创建一个JobConf实例,将该实例的jar,xml文件的路径设置为本地,并且将任务的jar,xml文件拷贝到本地(没有拷贝split)。

  private void localizeJob(TaskInProgress tip) throws IOException {
Path localJarFile = null;
Task t = tip.getTask();
JobID jobId = t.getJobID();
Path jobFile = new Path(t.getJobFile());
// Get sizes of JobFile and JarFile
// sizes are -1 if they are not present.
FileStatus status = null;
long jobFileSize = -1;
try {
status = systemFS.getFileStatus(jobFile);
jobFileSize = status.getLen();
} catch(FileNotFoundException fe) {
jobFileSize = -1;
}
Path localJobFile = lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "job.xml",
jobFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
if (!rjob.localized) {

FileSystem localFs = FileSystem.getLocal(fConf);
// this will happen on a partial execution of localizeJob.
// Sometimes the job.xml gets copied but copying job.jar
// might throw out an exception
// we should clean up and then try again
Path jobDir = localJobFile.getParent();
if (localFs.exists(jobDir)){
localFs.delete(jobDir, true);
boolean b = localFs.mkdirs(jobDir);
if (!b)
throw new IOException("Not able to create job directory "
+ jobDir.toString());
}
systemFS.copyToLocalFile(jobFile, localJobFile);
JobConf localJobConf = new JobConf(localJobFile);

// create the 'work' directory
// job-specific shared directory for use as scratch space
Path workDir = lDirAlloc.getLocalPathForWrite(
(getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "work"), fConf);
if (!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
}
System.setProperty("job.local.dir", workDir.toString());
localJobConf.set("job.local.dir", workDir.toString());

// copy Jar file to the local FS and unjar it.
String jarFile = localJobConf.getJar();
long jarFileSize = -1;
if (jarFile != null) {
Path jarFilePath = new Path(jarFile);
try {
status = systemFS.getFileStatus(jarFilePath);
jarFileSize = status.getLen();
} catch(FileNotFoundException fe) {
jarFileSize = -1;
}
// Here we check for and we check five times the size of jarFileSize
// to accommodate for unjarring the jar file in work directory
localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "jars",
5 * jarFileSize, fConf), "job.jar");
if (!localFs.mkdirs(localJarFile.getParent())) {
throw new IOException("Mkdirs failed to create jars directory ");
}
systemFS.copyToLocalFile(jarFilePath, localJarFile);
localJobConf.setJar(localJarFile.toString());
OutputStream out = localFs.create(localJobFile);
try {
localJobConf.writeXml(out);
} finally {
out.close();
}
// also unjar the job.jar files
RunJar.unJar(new File(localJarFile.toString()),
new File(localJarFile.getParent().toString()));
}
rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
localJobConf.getKeepFailedTaskFiles());
rjob.localized = true;
rjob.jobConf = localJobConf;
}
}
launchTaskForJob(tip, new JobConf(rjob.jobConf));
}


launchTaskForJob

  private void launchTaskForJob(TaskInProgress tip, JobConf jobConf) throws IOException{
synchronized (tip) {
tip.setJobConf(jobConf);
tip.launchTask();
}
}

 

launchTask

    public synchronized void launchTask() throws IOException {
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
localizeTask(task);
if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
this.taskStatus.setRunState(TaskStatus.State.RUNNING);
}
this.runner = task.createRunner(TaskTracker.this, this);
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
} else {
LOG.info("Not launching task: " + task.getTaskID() +
" since it's state is " + this.taskStatus.getRunState());
}
}


TaskRunner的run方法

  public final void run() {
try {

//before preparing the job localize
//all the archives
TaskAttemptID taskid = t.getTaskID();
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File jobCacheDir = null;
if (conf.getJar() != null) {
jobCacheDir = new File(
new Path(conf.getJar()).getParent().toString());
}
File workDir = new File(lDirAlloc.getLocalPathToRead(
TaskTracker.getLocalTaskDir(
t.getJobID().toString(),
t.getTaskID().toString(),
t.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
conf). toString());

URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
FileStatus fileStatus;
FileSystem fileSystem;
Path localPath;
String baseDir;

if ((archives != null) || (files != null)) {
if (archives != null) {
String[] archivesTimestamps =
DistributedCache.getArchiveTimestamps(conf);
Path[] p = new Path[archives.length];
for (int i = 0; i < archives.length;i++){
fileSystem = FileSystem.get(archives[i], conf);
fileStatus = fileSystem.getFileStatus(
new Path(archives[i].getPath()));
String cacheId = DistributedCache.makeRelative(archives[i],conf);
String cachePath = TaskTracker.getCacheSubdir() +
Path.SEPARATOR + cacheId;

localPath = lDirAlloc.getLocalPathForWrite(cachePath,
fileStatus.getLen(), conf);
baseDir = localPath.toString().replace(cacheId, "");
p[i] = DistributedCache.getLocalCache(archives[i], conf,
new Path(baseDir),
fileStatus,
true, Long.parseLong(
archivesTimestamps[i]),
new Path(workDir.
getAbsolutePath()),
false);

}
DistributedCache.setLocalArchives(conf, stringifyPathArray(p));
}
if ((files != null)) {
String[] fileTimestamps = DistributedCache.getFileTimestamps(conf);
Path[] p = new Path[files.length];
for (int i = 0; i < files.length;i++){
fileSystem = FileSystem.get(files[i], conf);
fileStatus = fileSystem.getFileStatus(
new Path(files[i].getPath()));
String cacheId = DistributedCache.makeRelative(files[i], conf);
String cachePath = TaskTracker.getCacheSubdir() +
Path.SEPARATOR + cacheId;

localPath = lDirAlloc.getLocalPathForWrite(cachePath,
fileStatus.getLen(), conf);
baseDir = localPath.toString().replace(cacheId, "");
p[i] = DistributedCache.getLocalCache(files[i], conf,
new Path(baseDir),
fileStatus,
false, Long.parseLong(
fileTimestamps[i]),
new Path(workDir.
getAbsolutePath()),
false);
}
DistributedCache.setLocalFiles(conf, stringifyPathArray(p));
}
Path localTaskFile = new Path(t.getJobFile());
FileSystem localFs = FileSystem.getLocal(conf);
localFs.delete(localTaskFile, true);
OutputStream out = localFs.create(localTaskFile);
try {
conf.writeXml(out);
} finally {
out.close();
}
}

if (!prepare()) {
return;
}

String sep = System.getProperty("path.separator");
StringBuffer classPath = new StringBuffer();
// start with same classpath as parent process
classPath.append(System.getProperty("java.class.path"));
classPath.append(sep);
if (!workDir.mkdirs()) {
if (!workDir.isDirectory()) {
LOG.fatal("Mkdirs failed to create " + workDir.toString());
}
}

String jar = conf.getJar();
if (jar != null) {
// if jar exists, it into workDir
File[] libs = new File(jobCacheDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
classPath.append(sep); // add libs from jar to classpath
classPath.append(libs[i]);
}
}
classPath.append(sep);
classPath.append(new File(jobCacheDir, "classes"));
classPath.append(sep);
classPath.append(jobCacheDir);

}

// include the user specified classpath

//archive paths
Path[] archiveClasspaths = DistributedCache.getArchiveClassPaths(conf);
if (archiveClasspaths != null && archives != null) {
Path[] localArchives = DistributedCache
.getLocalCacheArchives(conf);
if (localArchives != null){
for (int i=0;i<archives.length;i++){
for(int j=0;j<archiveClasspaths.length;j++){
if (archives[i].getPath().equals(
archiveClasspaths[j].toString())){
classPath.append(sep);
classPath.append(localArchives[i]
.toString());
}
}
}
}
}
//file paths
Path[] fileClasspaths = DistributedCache.getFileClassPaths(conf);
if (fileClasspaths!=null && files != null) {
Path[] localFiles = DistributedCache
.getLocalCacheFiles(conf);
if (localFiles != null) {
for (int i = 0; i < files.length; i++) {
for (int j = 0; j < fileClasspaths.length; j++) {
if (files[i].getPath().equals(
fileClasspaths[j].toString())) {
classPath.append(sep);
classPath.append(localFiles[i].toString());
}
}
}
}
}

classPath.append(sep);
classPath.append(workDir);
// Build exec child jmv args.
Vector<String> vargs = new Vector<String>(8);
File jvm = // use same jvm as parent
new File(new File(System.getProperty("java.home"), "bin"), "java");

vargs.add(jvm.toString());

// Add child (task) java-vm options.
//
// The following symbols if present in mapred.child.java.opts value are
// replaced:
// + @taskid@ is interpolated with value of TaskID.
// Other occurrences of @ will not be altered.
//
// Example with multiple arguments and substitutions, showing
// jvm GC logging, and start of a passwordless JVM JMX agent so can
// connect with jconsole and the likes to watch child memory, threads
// and get thread dumps.
//
// <property>
// <name>mapred.child.java.opts</name>
// <value>-verbose:gc -Xloggc:/tmp/@taskid@.gc \
// -Dcom.sun.management.jmxremote.authenticate=false \
// -Dcom.sun.management.jmxremote.ssl=false \
// </value>
// </property>
//
String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
javaOpts = javaOpts.replace("@taskid@", taskid.toString());
String [] javaOptsSplit = javaOpts.split(" ");

// Add java.library.path; necessary for loading native libraries.
//
// 1. To support native-hadoop library i.e. libhadoop.so, we add the
// parent processes' java.library.path to the child.
// 2. We also add the 'cwd' of the task to it's java.library.path to help
// users distribute native libraries via the DistributedCache.
// 3. The user can also specify extra paths to be added to the
// java.library.path via mapred.child.java.opts.
//
String libraryPath = System.getProperty("java.library.path");
if (libraryPath == null) {
libraryPath = workDir.getAbsolutePath();
} else {
libraryPath += sep + workDir;
}
boolean hasUserLDPath = false;
for(int i=0; i<javaOptsSplit.length ;i++) {
if(javaOptsSplit[i].startsWith("-Djava.library.path=")) {
javaOptsSplit[i] += sep + libraryPath;
hasUserLDPath = true;
break;
}
}
if(!hasUserLDPath) {
vargs.add("-Djava.library.path=" + libraryPath);
}
for (int i = 0; i < javaOptsSplit.length; i++) {
vargs.add(javaOptsSplit[i]);
}

// add java.io.tmpdir given by mapred.child.tmp
String tmp = conf.get("mapred.child.tmp", "./tmp");
Path tmpDir = new Path(tmp);

// if temp directory path is not absolute
// prepend it with workDir.
if (!tmpDir.isAbsolute()) {
tmpDir = new Path(workDir.toString(), tmp);
}
FileSystem localFs = FileSystem.getLocal(conf);
if (!localFs.mkdirs(tmpDir) && !localFs.getFileStatus(tmpDir).isDir()) {
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
}
vargs.add("-Djava.io.tmpdir=" + tmpDir.toString());

// Add classpath.
vargs.add("-classpath");
vargs.add(classPath.toString());

// Setup the log4j prop
long logSize = TaskLog.getTaskLogLength(conf);
vargs.add("-Dhadoop.log.dir=" +
new File(System.getProperty("hadoop.log.dir")
).getAbsolutePath());
vargs.add("-Dhadoop.root.logger=INFO,TLA");
vargs.add("-Dhadoop.tasklog.taskid=" + taskid);
vargs.add("-Dhadoop.tasklog.totalLogFileSize=" + logSize);

if (conf.getProfileEnabled()) {
if (conf.getProfileTaskRange(t.isMapTask()
).isIncluded(t.getPartition())) {
File prof = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.PROFILE);
vargs.add(String.format(conf.getProfileParams(), prof.toString()));
}
}

// Add main class and its arguments
vargs.add(Child.class.getName()); // main of Child
// pass umbilical address
InetSocketAddress address = tracker.getTaskTrackerReportAddress();
vargs.add(address.getAddress().getHostAddress());
vargs.add(Integer.toString(address.getPort()));
vargs.add(taskid.toString()); // pass task identifier

String pidFile = lDirAlloc.getLocalPathForWrite(
(TaskTracker.getPidFile(t.getJobID().toString(),
taskid.toString(), t.isTaskCleanupTask())),
this.conf).toString();
t.setPidFile(pidFile);
tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf, pidFile);

// set memory limit using ulimit if feasible and necessary ...
String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
List<String> setup = null;
if (ulimitCmd != null) {
setup = new ArrayList<String>();
for (String arg : ulimitCmd) {
setup.add(arg);
}
}

// Set up the redirection of the task's stdout and stderr streams
File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
stdout.getParentFile().mkdirs();
tracker.getTaskTrackerInstrumentation().reportTaskLaunch(taskid, stdout, stderr);

Map<String, String> env = new HashMap<String, String>();
StringBuffer ldLibraryPath = new StringBuffer();
ldLibraryPath.append(workDir.toString());
String oldLdLibraryPath = null;
oldLdLibraryPath = System.getenv("LD_LIBRARY_PATH");
if (oldLdLibraryPath != null) {
ldLibraryPath.append(sep);
ldLibraryPath.append(oldLdLibraryPath);
}
env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
jvmManager.launchJvm(this,
jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize,
workDir, env, pidFile, conf));
synchronized (lock) {
while (!done) {
lock.wait();
}
}
tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
if (exitCodeSet) {
if (!killed && exitCode != 0) {
if (exitCode == 65) {
tracker.getTaskTrackerInstrumentation().taskFailedPing(t.getTaskID());
}
throw new IOException("Task process exit with nonzero status of " +
exitCode + ".");
}
}
} catch (FSError e) {
LOG.fatal("FSError", e);
try {
tracker.fsError(t.getTaskID(), e.getMessage());
} catch (IOException ie) {
LOG.fatal(t.getTaskID()+" reporting FSError", ie);
}
} catch (Throwable throwable) {
LOG.warn(t.getTaskID()+" Child Error", throwable);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
throwable.printStackTrace(new PrintStream(baos));
try {
tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString());
} catch (IOException e) {
LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
}
} finally {
try{
URI[] archives = DistributedCache.getCacheArchives(conf);
URI[] files = DistributedCache.getCacheFiles(conf);
if (archives != null){
for (int i = 0; i < archives.length; i++){
DistributedCache.releaseCache(archives[i], conf);
}
}
if (files != null){
for(int i = 0; i < files.length; i++){
DistributedCache.releaseCache(files[i], conf);
}
}
}catch(IOException ie){
LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
}
tip.reportTaskFinished();
}
}


jvmManager.launchJvm

  public void launchJvm(TaskRunner t, JvmEnv env) {
if (t.getTask().isMapTask()) {
mapJvmManager.reapJvm(t, env);
} else {
reduceJvmManager.reapJvm(t, env);
}
}


reapJvm

    private synchronized void reapJvm( 
TaskRunner t, JvmEnv env) {
if (t.getTaskInProgress().wasKilled()) {
//the task was killed in-flight
//no need to do the rest of the operations
return;
}
boolean spawnNewJvm = false;
JobID jobId = t.getTask().getJobID();
//Check whether there is a free slot to start a new JVM.
//,or, Kill a (idle) JVM and launch a new one
//When this method is called, we *must*
// (1) spawn a new JVM (if we are below the max)
// (2) find an idle JVM (that belongs to the same job), or,
// (3) kill an idle JVM (from a different job)
// (the order of return is in the order above)
int numJvmsSpawned = jvmIdToRunner.size();
JvmRunner runnerToKill = null;
if (numJvmsSpawned >= maxJvms) {
//go through the list of JVMs for all jobs.
Iterator<Map.Entry<JVMId, JvmRunner>> jvmIter =
jvmIdToRunner.entrySet().iterator();

while (jvmIter.hasNext()) {
JvmRunner jvmRunner = jvmIter.next().getValue();
JobID jId = jvmRunner.jvmId.getJobId();
//look for a free JVM for this job; if one exists then just break
if (jId.equals(jobId) && !jvmRunner.isBusy() && !jvmRunner.ranAll()){
setRunningTaskForJvm(jvmRunner.jvmId, t); //reserve the JVM
LOG.info("No new JVM spawned for jobId/taskid: " +
jobId+"/"+t.getTask().getTaskID() +
". Attempting to reuse: " + jvmRunner.jvmId);
return;
}
//Cases when a JVM is killed:
// (1) the JVM under consideration belongs to the same job
// (passed in the argument). In this case, kill only when
// the JVM ran all the tasks it was scheduled to run (in terms
// of count).
// (2) the JVM under consideration belongs to a different job and is
// currently not busy
//But in both the above cases, we see if we can assign the current
//task to an idle JVM (hence we continue the loop even on a match)
if ((jId.equals(jobId) && jvmRunner.ranAll()) ||
(!jId.equals(jobId) && !jvmRunner.isBusy())) {
runnerToKill = jvmRunner;
spawnNewJvm = true;
}
}
} else {
spawnNewJvm = true;
}

if (spawnNewJvm) {
if (runnerToKill != null) {
LOG.info("Killing JVM: " + runnerToKill.jvmId);
runnerToKill.kill();
}
spawnNewJvm(jobId, env, t);
return;
}
//*MUST* never reach this
throw new RuntimeException("Inconsistent state!!! " +
"JVM Manager reached an unstable state " +
"while reaping a JVM for task: " + t.getTask().getTaskID()+
" " + getDetails());
}


spawnNewJvm

    private void spawnNewJvm(JobID jobId, JvmEnv env,  
TaskRunner t) {
JvmRunner jvmRunner = new JvmRunner(env,jobId);
jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
//spawn the JVM in a new thread. Note that there will be very little
//extra overhead of launching the new thread for a new JVM since
//most of the cost is involved in launching the process. Moreover,
//since we are going to be using the JVM for running many tasks,
//the thread launch cost becomes trivial when amortized over all
//tasks. Doing it this way also keeps code simple.
jvmRunner.setDaemon(true);
jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
setRunningTaskForJvm(jvmRunner.jvmId, t);
LOG.info(jvmRunner.getName());
jvmRunner.start();
}


JvmRunner是JvmManagerForType的私有类

    private class JvmRunner extends Thread {
JvmEnv env;
volatile boolean killed = false;
volatile int numTasksRan;
final int numTasksToRun;
JVMId jvmId;
volatile boolean busy = true;
private ShellCommandExecutor shexec; // shell terminal for running the task
public JvmRunner(JvmEnv env, JobID jobId) {
this.env = env;
this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
}
public void run() {
runChild(env);
}

public void runChild(JvmEnv env) {
try {
env.vargs.add(Integer.toString(jvmId.getId()));
List<String> wrappedCommand =
TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
env.logSize, env.pidFile);
shexec = new ShellCommandExecutor(wrappedCommand.toArray(new String[0]),
env.workDir, env.env);
shexec.execute();
} catch (IOException ioe) {
// do nothing
// error and output are appropriately redirected
} finally { // handle the exit code
if (shexec == null) {
return;
}
int exitCode = shexec.getExitCode();
updateOnJvmExit(jvmId, exitCode, killed);
LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " +
numTasksRan);
try {
// In case of jvm-reuse,
//the task jvm cleans up the common workdir for every
//task at the beginning of each task in the task JVM.
//For the last task, we do it here.
if (env.conf.getNumTasksToExecutePerJvm() != 1) {
FileUtil.fullyDelete(env.workDir);
}
} catch (IOException ie){}
}
}


ShellCommandExecutor创建了一个shell命令执行器,向构造函数传入的参数即为需要执行的shell命令,其execute调用父类Shell的run

    public void execute() throws IOException {
this.run();
}


它继续调用runCommand,在runCommand里,创建一个ProcessBuilder,该ProcessBuilder最终完成任务的执行

  /** check to see if a command needs to be executed and execute if needed */
protected void run() throws IOException {
if (lastTime + interval > System.currentTimeMillis())
return;
exitCode = 0; // reset for next run
runCommand();
}

/** Run a command */
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString());
boolean completed = false;

if (environment != null) {
builder.environment().putAll(this.environment);
}
if (dir != null) {
builder.directory(this.dir);
}

process = builder.start();
final BufferedReader errReader =
new BufferedReader(new InputStreamReader(process
.getErrorStream()));
BufferedReader inReader =
new BufferedReader(new InputStreamReader(process
.getInputStream()));
final StringBuffer errMsg = new StringBuffer();

// read error and input streams as this would free up the buffers
// free the error stream buffer
Thread errThread = new Thread() {
@Override
public void run() {
try {
String line = errReader.readLine();
while((line != null) && !isInterrupted()) {
errMsg.append(line);
errMsg.append(System.getProperty("line.separator"));
line = errReader.readLine();
}
} catch(IOException ioe) {
LOG.warn("Error reading the error stream", ioe);
}
}
};
try {
errThread.start();
} catch (IllegalStateException ise) { }
try {
parseExecResult(inReader); // parse the output
// clear the input stream buffer
String line = inReader.readLine();
while(line != null) {
line = inReader.readLine();
}
// wait for the process to finish and check the exit code
exitCode = process.waitFor();
try {
// make sure that the error thread exits
errThread.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted while reading the error stream", ie);
}
completed = true;
if (exitCode != 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
throw new IOException(ie.toString());
} finally {
// close the input stream
try {
inReader.close();
} catch (IOException ioe) {
LOG.warn("Error while closing the input stream", ioe);
}
if (!completed) {
errThread.interrupt();
}
try {
errReader.close();
} catch (IOException ioe) {
LOG.warn("Error while closing the error stream", ioe);
}
process.destroy();
lastTime = System.currentTimeMillis();
}
}



 


后记 

为作业创建split

在JobClient的submitJobInternal方法中为作业创建split的部分如下

    // Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
int maps;
if (job.getUseNewMapper()) {
maps = writeNewSplits(context, submitSplitFile);
} else {
maps = writeOldSplits(job, submitSplitFile);
}
job.set("mapred.job.split.file", submitSplitFile.toString());
job.setNumMapTasks(maps);


writeNewSplits

  int writeNewSplits(JobContext job, Path submitSplitFile
) throws IOException, InterruptedException,
ClassNotFoundException {
JobConf conf = job.getJobConf();
org.apache.hadoop.mapreduce.InputFormat<?,?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), job.getJobConf());

List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);
T[] array = (T[])
splits.toArray(new org.apache.hadoop.mapreduce.InputSplit[splits.size()]);

// sort the splits into order based on size, so that the biggest
// go first
Arrays.sort(array, new NewSplitComparator());
DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile,
array.length);
try {
if (array.length != 0) {
DataOutputBuffer buffer = new DataOutputBuffer();
RawSplit rawSplit = new RawSplit();
SerializationFactory factory = new SerializationFactory(conf);
Serializer<T> serializer =
factory.getSerializer((Class<T>) array[0].getClass());
serializer.open(buffer);
for(T split: array) {
rawSplit.setClassName(split.getClass().getName());
buffer.reset();
serializer.serialize(split);
rawSplit.setDataLength(split.getLength());
rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
rawSplit.setLocations(split.getLocations());
rawSplit.write(out);
}
serializer.close();
}
} finally {
out.close();
}
return array.length;
}


其中input在此为FileInputFormat

    org.apache.hadoop.mapreduce.InputFormat<?,?> input =
ReflectionUtils.newInstance(job.getInputFormatClass(), job.getJobConf());

List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);


其getSplits首先扫描输入文件目录,根据该目录下每一个文件创建FileSplit数组,FileSplit的构造函数需要参数为文件路径,起始,结束,所在主机

  public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
FileStatus[] files = listStatus(job);

long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDir()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
minSplitSize);

// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
FileSystem fs = path.getFileSystem(job);
long length = file.getLen();
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(new FileSplit(path, 0, length, splitHosts));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(path, 0, length, new String[0]));
}
}
LOG.debug("Total # of splits: " + splits.size());
return splits.toArray(new FileSplit[splits.size()]);
}


再返回到writeNewSplits,List<org.apache.hadoop.mapreduce.InputSplit> splits = input.getSplits(job);之后马上建立RawSplit数组并且写入到submitSplitFile目录

其中

          rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());

实现了数据的拷贝

    public void setBytes(byte[] data, int offset, int length) {
bytes.set(data, offset, length);
}

 

  public void set(byte[] newData, int offset, int length) {
setSize(0);
setSize(length);
System.arraycopy(newData, offset, bytes, 0, size);
}


JobInProgress.initTasks()利用Split

    //
// read input splits and create a map per a split
//
String jobFile = profile.getJobFile();

Path sysDir = new Path(this.jobtracker.getSystemDir());
FileSystem fs = sysDir.getFileSystem(conf);
DataInputStream splitFile =
fs.open(new Path(conf.get("mapred.job.split.file")));
JobClient.RawSplit[] splits;
try {
splits = JobClient.readSplitFile(splitFile);
} finally {
splitFile.close();
}
numMapTasks = splits.length;


 

 

JobClient静态方法readSplitFile可从第一节写入RawSplit的地方读取到RawSplit

  static RawSplit[] readSplitFile(DataInput in) throws IOException {
byte[] header = new byte[SPLIT_FILE_HEADER.length];
in.readFully(header);
if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
throw new IOException("Invalid header on split file");
}
int vers = WritableUtils.readVInt(in);
if (vers != CURRENT_SPLIT_FILE_VERSION) {
throw new IOException("Unsupported split version " + vers);
}
int len = WritableUtils.readVInt(in);
RawSplit[] result = new RawSplit[len];
for(int i=0; i < len; ++i) {
result[i] = new RawSplit();
result[i].readFields(in);
}
return result;
}


initTasks继续利用RawSplit创建任务

    maps = new TaskInProgress[numMapTasks];
for(int i=0; i < numMapTasks; ++i) {
inputLength += splits[i].getDataLength();
maps[i] = new TaskInProgress(jobId, jobFile,
splits[i],
jobtracker, conf, this, i);
}