Mesos源码分析(12): Mesos-Slave接收到RunTask消息

时间:2023-03-09 20:05:43
Mesos源码分析(12): Mesos-Slave接收到RunTask消息

在前文Mesos源码分析(8): Mesos-Slave的初始化中,Mesos-Slave接收到RunTaskMessage消息,会调用Slave::runTask.

 

  1. void Slave::runTask(
  2.     const UPID& from,
  3.     const FrameworkInfo& frameworkInfo,
  4.     const FrameworkID& frameworkId_,
  5.     const UPID& pid,
  6.     TaskInfo task)
  7. {
  8. ……
  9.   // Create frameworkId alias to use in the rest of the function.
  10.   const FrameworkID frameworkId = frameworkInfo.id();
  11.   LOG(INFO) << "Got assigned task " << task.task_id()
  12.             << " for framework " << frameworkId;
  13. ……
  14.   CHECK(state == RECOVERING || state == DISCONNECTED ||
  15.         state == RUNNING || state == TERMINATING)
  16.     << state;
  17.   // TODO(bmahler): Also ignore if we're DISCONNECTED.
  18.   if (state == RECOVERING || state == TERMINATING) {
  19.     LOG(WARNING) << "Ignoring task " << task.task_id()
  20.                  << " because the slave is " << state;
  21.     // TODO(vinod): Consider sending a TASK_LOST here.
  22.     // Currently it is tricky because 'statusUpdate()'
  23.     // ignores updates for unknown frameworks.
  24.     return;
  25.   }
  26.   Future<bool> unschedule = true;
  27.   // If we are about to create a new framework, unschedule the work
  28.   // and meta directories from getting gc'ed.
  29.   Framework* framework = getFramework(frameworkId);
  30.   if (framework == NULL) {
  31.     // Unschedule framework work directory.
  32.     string path = paths::getFrameworkPath(
  33.         flags.work_dir, info.id(), frameworkId);
  34.     if (os::exists(path)) {
  35.       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
  36.     }
  37.     // Unschedule framework meta directory.
  38.     path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
  39.     if (os::exists(path)) {
  40.       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
  41.     }
  42.     Option<UPID> frameworkPid = None();
  43.     if (pid != UPID()) {
  44.       frameworkPid = pid;
  45.     }
  46.     framework = new Framework(this, frameworkInfo, frameworkPid);
  47.     frameworks[frameworkId] = framework;
  48.     if (frameworkInfo.checkpoint()) {
  49.       framework->checkpointFramework();
  50.     }
  51.     // Is this same framework in completedFrameworks? If so, move the completed
  52.     // executors to this framework and remove it from that list.
  53.     // TODO(brenden): Consider using stout/cache.hpp instead of boost
  54.     // circular_buffer.
  55.     for (auto it = completedFrameworks.begin(), end = completedFrameworks.end();
  56.          it != end;
  57.          ++it) {
  58.       if ((*it)->id() == frameworkId) {
  59.         framework->completedExecutors = (*it)->completedExecutors;
  60.         completedFrameworks.erase(it);
  61.         break;
  62.       }
  63.     }
  64.   }
  65.   const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
  66.   const ExecutorID& executorId = executorInfo.executor_id();
  67.   if (HookManager::hooksAvailable()) {
  68.     // Set task labels from run task label decorator.
  69.     task.mutable_labels()->CopyFrom(HookManager::slaveRunTaskLabelDecorator(
  70.         task, executorInfo, frameworkInfo, info));
  71.   }
  72.   // We add the task to 'pending' to ensure the framework is not
  73.   // removed and the framework and top level executor directories
  74.   // are not scheduled for deletion before '_runTask()' is called.
  75.   CHECK_NOTNULL(framework);
  76.   framework->pending[executorId][task.task_id()] = task;
  77.   // If we are about to create a new executor, unschedule the top
  78.   // level work and meta directories from getting gc'ed.
  79.   Executor* executor = framework->getExecutor(executorId);
  80.   if (executor == NULL) {
  81.     // Unschedule executor work directory.
  82.     string path = paths::getExecutorPath(
  83.         flags.work_dir, info.id(), frameworkId, executorId);
  84.     if (os::exists(path)) {
  85.       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
  86.     }
  87.     // Unschedule executor meta directory.
  88.     path = paths::getExecutorPath(metaDir, info.id(), frameworkId, executorId);
  89.     if (os::exists(path)) {
  90.       unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
  91.     }
  92.   }
  93.   // Run the task after the unschedules are done.
  94.   unschedule.onAny(
  95.       defer(self(), &Self::_runTask, lambda::_1, frameworkInfo, task));
  96. }

 

最终调用Slave::_runTask

  1. void Slave::_runTask(
  2.     const Future<bool>& future,
  3.     const FrameworkInfo& frameworkInfo,
  4.     const TaskInfo& task)
  5. {
  6.   const FrameworkID frameworkId = frameworkInfo.id();
  7.   LOG(INFO) << "Launching task " << task.task_id()
  8.             << " for framework " << frameworkId;
  9.   Framework* framework = getFramework(frameworkId);
  10.   const ExecutorInfo executorInfo = getExecutorInfo(frameworkInfo, task);
  11.   const ExecutorID& executorId = executorInfo.executor_id();
  12.   if (framework->pending.contains(executorId) &&
  13.       framework->pending[executorId].contains(task.task_id())) {
  14.     framework->pending[executorId].erase(task.task_id());
  15.     if (framework->pending[executorId].empty()) {
  16.       framework->pending.erase(executorId);
  17.       // NOTE: Ideally we would perform the following check here:
  18.       //
  19.       // if (framework->executors.empty() &&
  20.       // framework->pending.empty()) {
  21.       // removeFramework(framework);
  22.       // }
  23.       //
  24.       // However, we need 'framework' to stay valid for the rest of
  25.       // this function. As such, we perform the check before each of
  26.       // the 'return' statements below.
  27.     }
  28.   } else {
  29.     LOG(WARNING) << "Ignoring run task " << task.task_id()
  30.                  << " of framework " << frameworkId
  31.                  << " because the task has been killed in the meantime";
  32.     return;
  33.   }
  34.   // We don't send a status update here because a terminating
  35.   // framework cannot send acknowledgements.
  36.   if (framework->state == Framework::TERMINATING) {
  37.     LOG(WARNING) << "Ignoring run task " << task.task_id()
  38.                  << " of framework " << frameworkId
  39.                  << " because the framework is terminating";
  40.     // Refer to the comment after 'framework->pending.erase' above
  41.     // for why we need this.
  42.     if (framework->executors.empty() && framework->pending.empty()) {
  43.       removeFramework(framework);
  44.     }
  45.     return;
  46.   }
  47.   if (!future.isReady()) {
  48.     LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
  49.                << (future.isFailed() ? future.failure() : "future discarded");
  50.     const StatusUpdate update = protobuf::createStatusUpdate(
  51.         frameworkId,
  52.         info.id(),
  53.         task.task_id(),
  54.         TASK_LOST,
  55.         TaskStatus::SOURCE_SLAVE,
  56.         UUID::random(),
  57.         "Could not launch the task because we failed to unschedule directories"
  58.         " scheduled for gc",
  59.         TaskStatus::REASON_GC_ERROR);
  60.     // TODO(vinod): Ensure that the status update manager reliably
  61.     // delivers this update. Currently, we don't guarantee this
  62.     // because removal of the framework causes the status update
  63.     // manager to stop retrying for its un-acked updates.
  64.     statusUpdate(update, UPID());
  65.     // Refer to the comment after 'framework->pending.erase' above
  66.     // for why we need this.
  67.     if (framework->executors.empty() && framework->pending.empty()) {
  68.       removeFramework(framework);
  69.     }
  70.     return;
  71.   }
  72.   // NOTE: If the task or executor uses resources that are
  73.   // checkpointed on the slave (e.g. persistent volumes), we should
  74.   // already know about it. If the slave doesn't know about them (e.g.
  75.   // CheckpointResourcesMessage was dropped or came out of order),
  76.   // we send TASK_LOST status updates here since restarting the task
  77.   // may succeed in the event that CheckpointResourcesMessage arrives
  78.   // out of order.
  79.   Resources checkpointedTaskResources =
  80.     Resources(task.resources()).filter(needCheckpointing);
  81.   foreach (const Resource& resource, checkpointedTaskResources) {
  82.     if (!checkpointedResources.contains(resource)) {
  83.       LOG(WARNING) << "Unknown checkpointed resource " << resource
  84.                    << " for task " << task.task_id()
  85.                    << " of framework " << frameworkId;
  86.       const StatusUpdate update = protobuf::createStatusUpdate(
  87.           frameworkId,
  88.           info.id(),
  89.           task.task_id(),
  90.           TASK_LOST,
  91.           TaskStatus::SOURCE_SLAVE,
  92.           UUID::random(),
  93.           "The checkpointed resources being used by the task are unknown to "
  94.           "the slave",
  95.           TaskStatus::REASON_RESOURCES_UNKNOWN);
  96.       statusUpdate(update, UPID());
  97.       // Refer to the comment after 'framework->pending.erase' above
  98.       // for why we need this.
  99.       if (framework->executors.empty() && framework->pending.empty()) {
  100.         removeFramework(framework);
  101.       }
  102.       return;
  103.     }
  104.   }
  105.   if (task.has_executor()) {
  106.     Resources checkpointedExecutorResources =
  107.       Resources(task.executor().resources()).filter(needCheckpointing);
  108.     foreach (const Resource& resource, checkpointedExecutorResources) {
  109.       if (!checkpointedResources.contains(resource)) {
  110.         LOG(WARNING) << "Unknown checkpointed resource " << resource
  111.                      << " for executor '" << task.executor().executor_id()
  112.                      << "' of framework " << frameworkId;
  113.         const StatusUpdate update = protobuf::createStatusUpdate(
  114.             frameworkId,
  115.             info.id(),
  116.             task.task_id(),
  117.             TASK_LOST,
  118.             TaskStatus::SOURCE_SLAVE,
  119.             UUID::random(),
  120.             "The checkpointed resources being used by the executor are unknown "
  121.             "to the slave",
  122.             TaskStatus::REASON_RESOURCES_UNKNOWN,
  123.             task.executor().executor_id());
  124.         statusUpdate(update, UPID());
  125.         // Refer to the comment after 'framework->pending.erase' above
  126.         // for why we need this.
  127.         if (framework->executors.empty() && framework->pending.empty()) {
  128.           removeFramework(framework);
  129.         }
  130.         return;
  131.       }
  132.     }
  133.   }
  134.   // NOTE: The slave cannot be in 'RECOVERING' because the task would
  135.   // have been rejected in 'runTask()' in that case.
  136.   CHECK(state == DISCONNECTED || state == RUNNING || state == TERMINATING)
  137.     << state;
  138.   if (state == TERMINATING) {
  139.     LOG(WARNING) << "Ignoring run task " << task.task_id()
  140.                  << " of framework " << frameworkId
  141.                  << " because the slave is terminating";
  142.     // Refer to the comment after 'framework->pending.erase' above
  143.     // for why we need this.
  144.     if (framework->executors.empty() && framework->pending.empty()) {
  145.       removeFramework(framework);
  146.     }
  147.     // We don't send a TASK_LOST here because the slave is
  148.     // terminating.
  149.     return;
  150.   }
  151.   CHECK(framework->state == Framework::RUNNING) << framework->state;
  152.   // Either send the task to an executor or start a new executor
  153.   // and queue the task until the executor has started.
  154.   Executor* executor = framework->getExecutor(executorId);
  155.   if (executor == NULL) {
  156.     executor = framework->launchExecutor(executorInfo, task);
  157.   }
  158.   CHECK_NOTNULL(executor);
  159.   switch (executor->state) {
  160.     case Executor::TERMINATING:
  161.     case Executor::TERMINATED: {
  162.       LOG(WARNING) << "Asked to run task '" << task.task_id()
  163.                    << "' for framework " << frameworkId
  164.                    << " with executor '" << executorId
  165.                    << "' which is terminating/terminated";
  166.       const StatusUpdate update = protobuf::createStatusUpdate(
  167.           frameworkId,
  168.           info.id(),
  169.           task.task_id(),
  170.           TASK_LOST,
  171.           TaskStatus::SOURCE_SLAVE,
  172.           UUID::random(),
  173.           "Executor terminating/terminated",
  174.           TaskStatus::REASON_EXECUTOR_TERMINATED);
  175.       statusUpdate(update, UPID());
  176.       break;
  177.     }
  178.     case Executor::REGISTERING:
  179.       // Checkpoint the task before we do anything else.
  180.       if (executor->checkpoint) {
  181.         executor->checkpointTask(task);
  182.       }
  183.       // Queue task if the executor has not yet registered.
  184.       LOG(INFO) << "Queuing task '" << task.task_id()
  185.                 << "' for executor " << *executor;
  186.       executor->queuedTasks[task.task_id()] = task;
  187.       break;
  188.     case Executor::RUNNING: {
  189.       // Checkpoint the task before we do anything else.
  190.       if (executor->checkpoint) {
  191.         executor->checkpointTask(task);
  192.       }
  193.       // Queue task until the containerizer is updated with new
  194.       // resource limits (MESOS-998).
  195.       LOG(INFO) << "Queuing task '" << task.task_id()
  196.                 << "' for executor " << *executor;
  197.       executor->queuedTasks[task.task_id()] = task;
  198.       // Update the resource limits for the container. Note that the
  199.       // resource limits include the currently queued tasks because we
  200.       // want the container to have enough resources to hold the
  201.       // upcoming tasks.
  202.       Resources resources = executor->resources;
  203.       // TODO(jieyu): Use foreachvalue instead once LinkedHashmap
  204.       // supports it.
  205.       foreach (const TaskInfo& task, executor->queuedTasks.values()) {
  206.         resources += task.resources();
  207.       }
  208.       containerizer->update(executor->containerId, resources)
  209.         .onAny(defer(self(),
  210.                      &Self::runTasks,
  211.                      lambda::_1,
  212.                      frameworkId,
  213.                      executorId,
  214.                      executor->containerId,
  215.                      list<TaskInfo>({task})));
  216.       break;
  217.     }
  218.     default:
  219.       LOG(FATAL) << "Executor " << *executor << " is in unexpected state "
  220.                  << executor->state;
  221.       break;
  222.   }
  223.   // We don't perform the checks for 'removeFramework' here since
  224.   // we're guaranteed by 'launchExecutor' that 'framework->executors'
  225.   // will be non-empty.
  226.   CHECK(!framework->executors.empty());
  227. }

 

在这个函数中,会调用Framework::launchExecutor去运行一个Executor

  1. // Create and launch an executor.
  2. Executor* Framework::launchExecutor(
  3.     const ExecutorInfo& executorInfo,
  4.     const TaskInfo& taskInfo)
  5. {
  6.   // Generate an ID for the executor's container.
  7.   // TODO(idownes) This should be done by the containerizer but we
  8.   // need the ContainerID to create the executor's directory. Fix
  9.   // this when 'launchExecutor()' is handled asynchronously.
  10.   ContainerID containerId;
  11.   containerId.set_value(UUID::random().toString());
  12.   Option<string> user = None();
  13.   // Create a directory for the executor.
  14.   const
    string directory = paths::createExecutorDirectory(
  15.       slave->flags.work_dir,
  16.       slave->info.id(),
  17.       id(),
  18.       executorInfo.executor_id(),
  19.       containerId,
  20.       user);
  21.   Executor* executor = new Executor(
  22.       slave, id(), executorInfo, containerId, directory, info.checkpoint());
  23.   if (executor->checkpoint) {
  24.     executor->checkpointExecutor();
  25.   }
  26.   CHECK(!executors.contains(executorInfo.executor_id()))
  27.     << "Unknown executor " << executorInfo.executor_id();
  28.   executors[executorInfo.executor_id()] = executor;
  29.   LOG(INFO) << "Launching executor " << executorInfo.executor_id()
  30.             << " of framework " << id()
  31.             << " with resources " << executorInfo.resources()
  32.             << " in work directory '" << directory << "'";
  33.   slave->files->attach(executor->directory, executor->directory)
  34.     .onAny(defer(slave, &Slave::fileAttached, lambda::_1, executor->directory));
  35.   // Tell the containerizer to launch the executor.
  36.   // NOTE: We modify the ExecutorInfo to include the task's
  37.   // resources when launching the executor so that the containerizer
  38.   // has non-zero resources to work with when the executor has
  39.   // no resources. This should be revisited after MESOS-600.
  40.   ExecutorInfo executorInfo_ = executor->info;
  41.   Resources resources = executorInfo_.resources();
  42.   resources += taskInfo.resources();
  43.   executorInfo_.mutable_resources()->CopyFrom(resources);
  44.   // Launch the container.
  45.   Future<bool> launch;
  46.   if (!executor->isCommandExecutor()) {
  47.     // If the executor is _not_ a command executor, this means that
  48.     // the task will include the executor to run. The actual task to
  49.     // run will be enqueued and subsequently handled by the executor
  50.     // when it has registered to the slave.
  51.     launch = slave->containerizer->launch(
  52.         containerId,
  53.         executorInfo_, // Modified to include the task's resources, see above.
  54.         executor->directory,
  55.         user,
  56.         slave->info.id(),
  57.         slave->self(),
  58.         info.checkpoint());
  59.   } else {
  60.     // An executor has _not_ been provided by the task and will
  61.     // instead define a command and/or container to run. Right now,
  62.     // these tasks will require an executor anyway and the slave
  63.     // creates a command executor. However, it is up to the
  64.     // containerizer how to execute those tasks and the generated
  65.     // executor info works as a placeholder.
  66.     // TODO(nnielsen): Obsolete the requirement for executors to run
  67.     // one-off tasks.
  68.     launch = slave->containerizer->launch(
  69.         containerId,
  70.         taskInfo,
  71.         executorInfo_, // Modified to include the task's resources, see above.
  72.         executor->directory,
  73.         user,
  74.         slave->info.id(),
  75.         slave->self(),
  76.         info.checkpoint());
  77.   }
  78.   launch.onAny(defer(slave,
  79.                      &Slave::executorLaunched,
  80.                      id(),
  81.                      executor->id,
  82.                      containerId,
  83.                      lambda::_1));
  84.   // Make sure the executor registers within the given timeout.
  85.   delay(slave->flags.executor_registration_timeout,
  86.         slave,
  87.         &Slave::registerExecutorTimeout,
  88.         id(),
  89.         executor->id,
  90.         containerId);
  91.   return executor;
  92. }

 

会给Task创建一个文件夹里面有Framework和Executor的信息。

最终会调用containerizer->launch。

根据前面Mesos源码分析(7): Mesos-Slave的启动中所说,这里的containerizer是指ComposingContainerizer。

  1. Future<bool> ComposingContainerizer::launch(
  2.     const ContainerID& containerId,
  3.     const TaskInfo& taskInfo,
  4.     const ExecutorInfo& executorInfo,
  5.     const
    string& directory,
  6.     const Option<string>& user,
  7.     const SlaveID& slaveId,
  8.     const PID<Slave>& slavePid,
  9.     bool checkpoint)
  10. {
  11.   return dispatch(process,
  12.                   &ComposingContainerizerProcess::launch,
  13.                   containerId,
  14.                   taskInfo,
  15.                   executorInfo,
  16.                   directory,
  17.                   user,
  18.                   slaveId,
  19.                   slavePid,
  20.                   checkpoint);
  21. }

 

ComposingContainerizer调用ComposingContainerizerProcess::launch

  1. Future<bool> ComposingContainerizerProcess::launch(
  2.     const ContainerID& containerId,
  3.     const TaskInfo& taskInfo,
  4.     const ExecutorInfo& executorInfo,
  5.     const
    string& directory,
  6.     const Option<string>& user,
  7.     const SlaveID& slaveId,
  8.     const PID<Slave>& slavePid,
  9.     bool checkpoint)
  10. {
  11.   if (containers_.contains(containerId)) {
  12.     return Failure("Container '" + stringify(containerId) +
  13.                    "' is already launching");
  14.   }
  15.   // Try each containerizer. If none of them handle the
  16.   // TaskInfo/ExecutorInfo then return a Failure.
  17.   vector<Containerizer*>::iterator containerizer = containerizers_.begin();
  18.   Container* container = new Container();
  19.   container->state = LAUNCHING;
  20.   container->containerizer = *containerizer;
  21.   containers_[containerId] = container;
  22.   return (*containerizer)->launch(
  23.       containerId,
  24.       taskInfo,
  25.       executorInfo,
  26.       directory,
  27.       user,
  28.       slaveId,
  29.       slavePid,
  30.       checkpoint)
  31.     .then(defer(self(),
  32.                 &Self::_launch,
  33.                 containerId,
  34.                 taskInfo,
  35.                 executorInfo,
  36.                 directory,
  37.                 user,
  38.                 slaveId,
  39.                 slavePid,
  40.                 checkpoint,
  41.                 containerizer,
  42.                 lambda::_1));
  43. }

 

上面这个函数可以从第一个containerizer开始,调用它的launch函数,然后再调用ComposingContainerizerProcess::_launch函数。

  1. Future<bool> ComposingContainerizerProcess::_launch(
  2.     const ContainerID& containerId,
  3.     const Option<TaskInfo>& taskInfo,
  4.     const ExecutorInfo& executorInfo,
  5.     const
    string& directory,
  6.     const Option<string>& user,
  7.     const SlaveID& slaveId,
  8.     const PID<Slave>& slavePid,
  9.     bool checkpoint,
  10.     vector<Containerizer*>::iterator containerizer,
  11.     bool launched)
  12. {
  13.   // The container struct won't be cleaned up by destroy because
  14.   // in destroy we only forward the destroy, and wait until the
  15.   // launch returns and clean up here.
  16.   CHECK(containers_.contains(containerId));
  17.   Container* container = containers_[containerId];
  18.   if (container->state == DESTROYED) {
  19.     containers_.erase(containerId);
  20.     delete container;
  21.     return Failure("Container was destroyed while launching");
  22.   }
  23.   if (launched) {
  24.     container->state = LAUNCHED;
  25.     return
    true;
  26.   }
  27.   // Try the next containerizer.
  28.   ++containerizer;
  29.   if (containerizer == containerizers_.end()) {
  30.     containers_.erase(containerId);
  31.     delete container;
  32.     return
    false;
  33.   }
  34.   container->containerizer = *containerizer;
  35.   Future<bool> f = taskInfo.isSome() ?
  36.       (*containerizer)->launch(
  37.           containerId,
  38.           taskInfo.get(),
  39.           executorInfo,
  40.           directory,
  41.           user,
  42.           slaveId,
  43.           slavePid,
  44.           checkpoint) :
  45.       (*containerizer)->launch(
  46.           containerId,
  47.           executorInfo,
  48.           directory,
  49.           user,
  50.           slaveId,
  51.           slavePid,
  52.           checkpoint);
  53.   return f.then(
  54.       defer(self(),
  55.             &Self::_launch,
  56.             containerId,
  57.             taskInfo,
  58.             executorInfo,
  59.             directory,
  60.             user,
  61.             slaveId,
  62.             slavePid,
  63.             checkpoint,
  64.             containerizer,
  65.             lambda::_1));
  66. }

 

ComposingContainerizerProcess::_launch函数会调用下一个Containerizer的launch函数,直到最后一个containerizer。