Mesos源码分析(11): Mesos-Master接收到launchTasks消息

时间:2024-04-02 18:36:22

根据Mesos源码分析(6): Mesos Master的初始化中的代码分析,当Mesos-Master接收到launchTask消息的时候,会调用Master::launchTasks函数。

 

  1. void Master::launchTasks(
  2.     const UPID& from,
  3.     const FrameworkID& frameworkId,
  4.     const vector<TaskInfo>& tasks,
  5.     const Filters& filters,
  6.     const vector<OfferID>& offerIds)
  7. {
  8.   Framework* framework = getFramework(frameworkId);
  9.   if (framework == NULL) {
  10.     LOG(WARNING)
  11.       << "Ignoring launch tasks message for offers " << stringify(offerIds)
  12.       << " of framework " << frameworkId
  13.       << " because the framework cannot be found";
  14.     return;
  15.   }
  16.   if (framework->pid != from) {
  17.     LOG(WARNING)
  18.       << "Ignoring launch tasks message for offers " << stringify(offerIds)
  19.       << " from '" << from << "' because it is not from the"
  20.       << " registered framework " << *framework;
  21.     return;
  22.   }
  23.   // Currently when no tasks are specified in the launchTasks message
  24.   // it is implicitly considered a decline of the offers.
  25.   if (!tasks.empty()) {
  26.     scheduler::Call::Accept message;
  27.     message.mutable_filters()->CopyFrom(filters);
  28.     Offer::Operation* operation = message.add_operations();
  29.     operation->set_type(Offer::Operation::LAUNCH);
  30.     foreach (const TaskInfo& task, tasks) {
  31.       operation->mutable_launch()->add_task_infos()->CopyFrom(task);
  32.     }
  33.     foreach (const OfferID& offerId, offerIds) {
  34.       message.add_offer_ids()->CopyFrom(offerId);
  35.     }
  36.     accept(framework, message);
  37.   } else {
  38.     scheduler::Call::Decline message;
  39.     message.mutable_filters()->CopyFrom(filters);
  40.     foreach (const OfferID& offerId, offerIds) {
  41.       message.add_offer_ids()->CopyFrom(offerId);
  42.     }
  43.     decline(framework, message);
  44.   }
  45. }

 

它会进一步调用accept函数

  1. void Master::accept(
  2.     Framework* framework,
  3.     const scheduler::Call::Accept& accept)
  4. {
  5.   CHECK_NOTNULL(framework);
  6.   foreach (const Offer::Operation& operation, accept.operations()) {
  7.     if (operation.type() == Offer::Operation::LAUNCH) {
  8.       if (operation.launch().task_infos().size() > 0) {
  9.         ++metrics->messages_launch_tasks;
  10.       } else {
  11.         ++metrics->messages_decline_offers;
  12.       }
  13.     }
  14.     // TODO(jieyu): Add metrics for non launch operations.
  15.   }
  16.   // TODO(bmahler): We currently only support using multiple offers
  17.   // for a single slave.
  18.   Resources offeredResources;
  19.   Option<SlaveID> slaveId = None();
  20.   Option<Error> error = None();
  21.   if (accept.offer_ids().size() == 0) {
  22.     error = Error("No offers specified");
  23.   } else {
  24.     // Validate the offers.
  25.     error = validation::offer::validate(accept.offer_ids(), this, framework);
  26.     // Compute offered resources and remove the offers. If the
  27.     // validation failed, return resources to the allocator.
  28.     foreach (const OfferID& offerId, accept.offer_ids()) {
  29.       Offer* offer = getOffer(offerId);
  30.       // Since we re-use `OfferID`s, it is possible to arrive here with either
  31.       // a resource offer, or an inverse offer. We first try as a resource offer
  32.       // and if that fails, then we assume it is an inverse offer. This is
  33.       // correct as those are currently the only 2 ways to get an `OfferID`.
  34.       if (offer != NULL) {
  35.         slaveId = offer->slave_id();
  36.         offeredResources += offer->resources();
  37.         if (error.isSome()) {
  38.           allocator->recoverResources(
  39.               offer->framework_id(),
  40.               offer->slave_id(),
  41.               offer->resources(),
  42.               None());
  43.         }
  44.         removeOffer(offer);
  45.         continue;
  46.       }
  47.       // Try it as an inverse offer. If this fails then the offer is no longer
  48.       // valid.
  49.       InverseOffer* inverseOffer = getInverseOffer(offerId);
  50.       if (inverseOffer != NULL) {
  51.         mesos::master::InverseOfferStatus status;
  52.         status.set_status(mesos::master::InverseOfferStatus::ACCEPT);
  53.         status.mutable_framework_id()->CopyFrom(inverseOffer->framework_id());
  54.         status.mutable_timestamp()->CopyFrom(protobuf::getCurrentTime());
  55.         allocator->updateInverseOffer(
  56.             inverseOffer->slave_id(),
  57.             inverseOffer->framework_id(),
  58.             UnavailableResources{
  59.                 inverseOffer->resources(),
  60.                 inverseOffer->unavailability()},
  61.             status,
  62.             accept.filters());
  63.         removeInverseOffer(inverseOffer);
  64.         continue;
  65.       }
  66.       // If the offer was neither in our offer or inverse offer sets, then this
  67.       // offer is no longer valid.
  68.       LOG(WARNING) << "Ignoring accept of offer " << offerId
  69.                    << " since it is no longer valid";
  70.     }
  71.   }
  72.   // If invalid, send TASK_LOST for the launch attempts.
  73.   // TODO(jieyu): Consider adding a 'drop' overload for ACCEPT call to
  74.   // consistently handle message dropping. It would be ideal if the
  75.   // 'drop' overload can handle both resource recovery and lost task
  76.   // notifications.
  77.   if (error.isSome()) {
  78.     LOG(WARNING) << "ACCEPT call used invalid offers '" << accept.offer_ids()
  79.                  << "': " << error.get().message;
  80.     foreach (const Offer::Operation& operation, accept.operations()) {
  81.       if (operation.type() != Offer::Operation::LAUNCH) {
  82.         continue;
  83.       }
  84.       foreach (const TaskInfo& task, operation.launch().task_infos()) {
  85.         const StatusUpdate& update = protobuf::createStatusUpdate(
  86.             framework->id(),
  87.             task.slave_id(),
  88.             task.task_id(),
  89.             TASK_LOST,
  90.             TaskStatus::SOURCE_MASTER,
  91.             None(),
  92.             "Task launched with invalid offers: " + error.get().message,
  93.             TaskStatus::REASON_INVALID_OFFERS);
  94.         metrics->tasks_lost++;
  95.         metrics->incrementTasksStates(
  96.             TASK_LOST,
  97.             TaskStatus::SOURCE_MASTER,
  98.             TaskStatus::REASON_INVALID_OFFERS);
  99.         forward(update, UPID(), framework);
  100.       }
  101.     }
  102.     return;
  103.   }
  104.   CHECK_SOME(slaveId);
  105.   Slave* slave = slaves.registered.get(slaveId.get());
  106.   CHECK_NOTNULL(slave);
  107.   LOG(INFO) << "Processing ACCEPT call for offers: " << accept.offer_ids()
  108.             << " on slave " << *slave << " for framework " << *framework;
  109.   list<Future<bool>> futures;
  110.   foreach (const Offer::Operation& operation, accept.operations()) {
  111.     switch (operation.type()) {
  112.       case Offer::Operation::LAUNCH: {
  113.         // Authorize the tasks. A task is in 'framework->pendingTasks'
  114.         // before it is authorized.
  115.         foreach (const TaskInfo& task, operation.launch().task_infos()) {
  116.           futures.push_back(authorizeTask(task, framework));
  117.           // Add to pending tasks.
  118.           //
  119.           // NOTE: The task ID here hasn't been validated yet, but it
  120.           // doesn't matter. If the task ID is not valid, the task won't
  121.           // be launched anyway. If two tasks have the same ID, the second
  122.           // one will not be put into 'framework->pendingTasks', therefore
  123.           // will not be launched.
  124.           if (!framework->pendingTasks.contains(task.task_id())) {
  125.             framework->pendingTasks[task.task_id()] = task;
  126.           }
  127.         }
  128.         break;
  129.       }
  130.       // NOTE: When handling RESERVE and UNRESERVE operations, authorization
  131.       // will proceed even if no principal is specified, although currently
  132.       // resources cannot be reserved or unreserved unless a principal is
  133.       // provided. Any RESERVE/UNRESERVE operation with no associated principal
  134.       // will be found invalid when `validate()` is called in `_accept()` below.
  135.       // The RESERVE operation allows a principal to reserve resources.
  136.       case Offer::Operation::RESERVE: {
  137.         Option<string> principal = framework->info.has_principal()
  138.           ? framework->info.principal()
  139.           : Option<string>::none();
  140.         futures.push_back(
  141.             authorizeReserveResources(
  142.                 operation.reserve(), principal));
  143.         break;
  144.       }
  145.       // The UNRESERVE operation allows a principal to unreserve resources.
  146.       case Offer::Operation::UNRESERVE: {
  147.         Option<string> principal = framework->info.has_principal()
  148.           ? framework->info.principal()
  149.           : Option<string>::none();
  150.         futures.push_back(
  151.             authorizeUnreserveResources(
  152.                 operation.unreserve(), principal));
  153.         break;
  154.       }
  155.       // The CREATE operation allows the creation of a persistent volume.
  156.       case Offer::Operation::CREATE: {
  157.         Option<string> principal = framework->info.has_principal()
  158.           ? framework->info.principal()
  159.           : Option<string>::none();
  160.         futures.push_back(
  161.             authorizeCreateVolume(
  162.                 operation.create(), principal));
  163.         break;
  164.       }
  165.       // The DESTROY operation allows the destruction of a persistent volume.
  166.       case Offer::Operation::DESTROY: {
  167.         Option<string> principal = framework->info.has_principal()
  168.           ? framework->info.principal()
  169.           : Option<string>::none();
  170.         futures.push_back(
  171.             authorizeDestroyVolume(
  172.                 operation.destroy(), principal));
  173.         break;
  174.       }
  175.     }
  176.   }
  177.   // Wait for all the tasks to be authorized.
  178.   await(futures)
  179.     .onAny(defer(self(),
  180.                  &Master::_accept,
  181.                  framework->id(),
  182.                  slaveId.get(),
  183.                  offeredResources,
  184.                  accept,
  185.                  lambda::_1));
  186. }

 

如果鉴权通过,则调用Master::_accept

  1. void Master::_accept(
  2.     const FrameworkID& frameworkId,
  3.     const SlaveID& slaveId,
  4.     const Resources& offeredResources,
  5.     const scheduler::Call::Accept& accept,
  6.     const Future<list<Future<bool>>>& _authorizations)
  7. {
  8.   Framework* framework = getFramework(frameworkId);
  9. ……
  10.   Slave* slave = slaves.registered.get(slaveId);
  11.   if (slave == NULL || !slave->connected) {
  12.     foreach (const Offer::Operation& operation, accept.operations()) {
  13.       if (operation.type() != Offer::Operation::LAUNCH) {
  14.         continue;
  15.       }
  16.       foreach (const TaskInfo& task, operation.launch().task_infos()) {
  17.         const TaskStatus::Reason reason =
  18.             slave == NULL ? TaskStatus::REASON_SLAVE_REMOVED
  19.                           : TaskStatus::REASON_SLAVE_DISCONNECTED;
  20.         const StatusUpdate& update = protobuf::createStatusUpdate(
  21.             framework->id(),
  22.             task.slave_id(),
  23.             task.task_id(),
  24.             TASK_LOST,
  25.             TaskStatus::SOURCE_MASTER,
  26.             None(),
  27.             slave == NULL ? "Slave removed" : "Slave disconnected",
  28.             reason);
  29.         metrics->tasks_lost++;
  30.         metrics->incrementTasksStates(
  31.             TASK_LOST,
  32.             TaskStatus::SOURCE_MASTER,
  33.             reason);
  34.         forward(update, UPID(), framework);
  35.       }
  36.     }
  37.     // Tell the allocator about the recovered resources.
  38.     allocator->recoverResources(
  39.         frameworkId,
  40.         slaveId,
  41.         offeredResources,
  42.         None());
  43.     return;
  44.   }
  45.   // Some offer operations update the offered resources. We keep
  46.   // updated offered resources here. When a task is successfully
  47.   // launched, we remove its resource from offered resources.
  48.   Resources _offeredResources = offeredResources;
  49.   // The order of `authorizations` must match the order of the operations in
  50.   // `accept.operations()`, as they are iterated through simultaneously.
  51.   CHECK_READY(_authorizations);
  52.   list<Future<bool>> authorizations = _authorizations.get();
  53.   foreach (const Offer::Operation& operation, accept.operations()) {
  54.     switch (operation.type()) {
  55.       // The RESERVE operation allows a principal to reserve resources.
  56.       case Offer::Operation::RESERVE: {
  57.         Future<bool> authorization = authorizations.front();
  58.         authorizations.pop_front();
  59.         CHECK(!authorization.isDiscarded());
  60.         if (authorization.isFailed()) {
  61.           // TODO(greggomann): We may want to retry this failed authorization
  62.           // request rather than dropping it immediately.
  63.           drop(framework,
  64.                operation,
  65.                "Authorization of principal '" + framework->info.principal() +
  66.                "' to reserve resources failed: " +
  67.                authorization.failure());
  68.           continue;
  69.         } else
    if (!authorization.get()) {
  70.           drop(framework,
  71.                operation,
  72.                "Not authorized to reserve resources as '" +
  73.                  framework->info.principal() + "'");
  74.           continue;
  75.         }
  76.         Option<string> principal = framework->info.has_principal()
  77.           ? framework->info.principal()
  78.           : Option<string>::none();
  79.         // Make sure this reserve operation is valid.
  80.         Option<Error> error = validation::operation::validate(
  81.             operation.reserve(), principal);
  82.         if (error.isSome()) {
  83.           drop(framework, operation, error.get().message);
  84.           continue;
  85.         }
  86.         // Test the given operation on the included resources.
  87.         Try<Resources> resources = _offeredResources.apply(operation);
  88.         if (resources.isError()) {
  89.           drop(framework, operation, resources.error());
  90.           continue;
  91.         }
  92.         _offeredResources = resources.get();
  93.         LOG(INFO) << "Applying RESERVE operation for resources "
  94.                   << operation.reserve().resources() << " from framework "
  95.                   << *framework << " to slave " << *slave;
  96.         apply(framework, slave, operation);
  97.         break;
  98.       }
  99.       // The UNRESERVE operation allows a principal to unreserve resources.
  100.       case Offer::Operation::UNRESERVE: {
  101.         Future<bool> authorization = authorizations.front();
  102.         authorizations.pop_front();
  103.         CHECK(!authorization.isDiscarded());
  104.         if (authorization.isFailed()) {
  105.           // TODO(greggomann): We may want to retry this failed authorization
  106.           // request rather than dropping it immediately.
  107.           drop(framework,
  108.                operation,
  109.                "Authorization of principal '" + framework->info.principal() +
  110.                "' to unreserve resources failed: " +
  111.                authorization.failure());
  112.           continue;
  113.         } else
    if (!authorization.get()) {
  114.           drop(framework,
  115.                operation,
  116.                "Not authorized to unreserve resources as '" +
  117.                  framework->info.principal() + "'");
  118.           continue;
  119.         }
  120.         // Make sure this unreserve operation is valid.
  121.         Option<Error> error = validation::operation::validate(
  122.             operation.unreserve());
  123.         if (error.isSome()) {
  124.           drop(framework, operation, error.get().message);
  125.           continue;
  126.         }
  127.         // Test the given operation on the included resources.
  128.         Try<Resources> resources = _offeredResources.apply(operation);
  129.         if (resources.isError()) {
  130.           drop(framework, operation, resources.error());
  131.           continue;
  132.         }
  133.         _offeredResources = resources.get();
  134.         LOG(INFO) << "Applying UNRESERVE operation for resources "
  135.                   << operation.unreserve().resources() << " from framework "
  136.                   << *framework << " to slave " << *slave;
  137.         apply(framework, slave, operation);
  138.         break;
  139.       }
  140.       case Offer::Operation::CREATE: {
  141.         Future<bool> authorization = authorizations.front();
  142.         authorizations.pop_front();
  143.         CHECK(!authorization.isDiscarded());
  144.         if (authorization.isFailed()) {
  145.           // TODO(greggomann): We may want to retry this failed authorization
  146.           // request rather than dropping it immediately.
  147.           drop(framework,
  148.                operation,
  149.                "Authorization of principal '" + framework->info.principal() +
  150.                "' to create persistent volumes failed: " +
  151.                authorization.failure());
  152.           continue;
  153.         } else
    if (!authorization.get()) {
  154.           drop(framework,
  155.                operation,
  156.                "Not authorized to create persistent volumes as '" +
  157.                  framework->info.principal() + "'");
  158.           continue;
  159.         }
  160.         // Make sure this create operation is valid.
  161.         Option<Error> error = validation::operation::validate(
  162.             operation.create(), slave->checkpointedResources);
  163.         if (error.isSome()) {
  164.           drop(framework, operation, error.get().message);
  165.           continue;
  166.         }
  167.         Try<Resources> resources = _offeredResources.apply(operation);
  168.         if (resources.isError()) {
  169.           drop(framework, operation, resources.error());
  170.           continue;
  171.         }
  172.         _offeredResources = resources.get();
  173.         LOG(INFO) << "Applying CREATE operation for volumes "
  174.                   << operation.create().volumes() << " from framework "
  175.                   << *framework << " to slave " << *slave;
  176.         apply(framework, slave, operation);
  177.         break;
  178.       }
  179.       case Offer::Operation::DESTROY: {
  180.         Future<bool> authorization = authorizations.front();
  181.         authorizations.pop_front();
  182.         CHECK(!authorization.isDiscarded());
  183.         if (authorization.isFailed()) {
  184.           // TODO(greggomann): We may want to retry this failed authorization
  185.           // request rather than dropping it immediately.
  186.           drop(framework,
  187.                operation,
  188.                "Authorization of principal '" + framework->info.principal() +
  189.                "' to destroy persistent volumes failed: " +
  190.                authorization.failure());
  191.           continue;
  192.         } else
    if (!authorization.get()) {
  193.           drop(framework,
  194.                operation,
  195.                "Not authorized to destroy persistent volumes as '" +
  196.                  framework->info.principal() + "'");
  197.           continue;
  198.         }
  199.         // Make sure this destroy operation is valid.
  200.         Option<Error> error = validation::operation::validate(
  201.             operation.destroy(), slave->checkpointedResources);
  202.         if (error.isSome()) {
  203.           drop(framework, operation, error.get().message);
  204.           continue;
  205.         }
  206.         Try<Resources> resources = _offeredResources.apply(operation);
  207.         if (resources.isError()) {
  208.           drop(framework, operation, resources.error());
  209.           continue;
  210.         }
  211.         _offeredResources = resources.get();
  212.         LOG(INFO) << "Applying DESTROY operation for volumes "
  213.                   << operation.create().volumes() << " from framework "
  214.                   << *framework << " to slave " << *slave;
  215.         apply(framework, slave, operation);
  216.         break;
  217.       }
  218.       case Offer::Operation::LAUNCH: {
  219.         foreach (const TaskInfo& task, operation.launch().task_infos()) {
  220.           Future<bool> authorization = authorizations.front();
  221.           authorizations.pop_front();
  222.           // NOTE: The task will not be in 'pendingTasks' if
  223.           // 'killTask()' for the task was called before we are here.
  224.           // No need to launch the task if it's no longer pending.
  225.           // However, we still need to check the authorization result
  226.           // and do the validation so that we can send status update
  227.           // in case the task has duplicated ID.
  228.           bool pending = framework->pendingTasks.contains(task.task_id());
  229.           // Remove from pending tasks.
  230.           framework->pendingTasks.erase(task.task_id());
  231.           CHECK(!authorization.isDiscarded());
  232.           if (authorization.isFailed() || !authorization.get()) {
  233.             string user = framework->info.user(); // Default user.
  234.             if (task.has_command() && task.command().has_user()) {
  235.               user = task.command().user();
  236.             } else
    if (task.has_executor() &&
  237.                        task.executor().command().has_user()) {
  238.               user = task.executor().command().user();
  239.             }
  240.             const StatusUpdate& update = protobuf::createStatusUpdate(
  241.                 framework->id(),
  242.                 task.slave_id(),
  243.                 task.task_id(),
  244.                 TASK_ERROR,
  245.                 TaskStatus::SOURCE_MASTER,
  246.                 None(),
  247.                 authorization.isFailed() ?
  248.                     "Authorization failure: " + authorization.failure() :
  249.                     "Not authorized to launch as user '" + user + "'",
  250.                 TaskStatus::REASON_TASK_UNAUTHORIZED);
  251.             metrics->tasks_error++;
  252.             metrics->incrementTasksStates(
  253.                 TASK_ERROR,
  254.                 TaskStatus::SOURCE_MASTER,
  255.                 TaskStatus::REASON_TASK_UNAUTHORIZED);
  256.             forward(update, UPID(), framework);
  257.             continue;
  258.           }
  259.           // Validate the task.
  260.           // Make a copy of the original task so that we can
  261.           // fill the missing `framework_id` in ExecutorInfo
  262.           // if needed. This field was added to the API later
  263.           // and thus was made optional.
  264.           TaskInfo task_(task);
  265.           if (task.has_executor() && !task.executor().has_framework_id()) {
  266.             task_.mutable_executor()
  267.                 ->mutable_framework_id()->CopyFrom(framework->id());
  268.           }
  269.           const Option<Error>& validationError = validation::task::validate(
  270.               task_,
  271.               framework,
  272.               slave,
  273.               _offeredResources);
  274.           if (validationError.isSome()) {
  275.             const StatusUpdate& update = protobuf::createStatusUpdate(
  276.                 framework->id(),
  277.                 task_.slave_id(),
  278.                 task_.task_id(),
  279.                 TASK_ERROR,
  280.                 TaskStatus::SOURCE_MASTER,
  281.                 None(),
  282.                 validationError.get().message,
  283.                 TaskStatus::REASON_TASK_INVALID);
  284.             metrics->tasks_error++;
  285.             metrics->incrementTasksStates(
  286.                 TASK_ERROR,
  287.                 TaskStatus::SOURCE_MASTER,
  288.                 TaskStatus::REASON_TASK_INVALID);
  289.             forward(update, UPID(), framework);
  290.             continue;
  291.           }
  292.           // Add task.
  293.           if (pending) {
  294.             _offeredResources -= addTask(task_, framework, slave);
  295.             // TODO(bmahler): Consider updating this log message to
  296.             // indicate when the executor is also being launched.
  297.             LOG(INFO) << "Launching task " << task_.task_id()
  298.                       << " of framework " << *framework
  299.                       << " with resources " << task_.resources()
  300.                       << " on slave " << *slave;
  301.             RunTaskMessage message;
  302.             message.mutable_framework()->MergeFrom(framework->info);
  303.             // TODO(anand): We set 'pid' to UPID() for http frameworks
  304.             // as 'pid' was made optional in 0.24.0. In 0.25.0, we
  305.             // no longer have to set pid here for http frameworks.
  306.             message.set_pid(framework->pid.getOrElse(UPID()));
  307.             message.mutable_task()->MergeFrom(task_);
  308.             if (HookManager::hooksAvailable()) {
  309.               // Set labels retrieved from label-decorator hooks.
  310.               message.mutable_task()->mutable_labels()->CopyFrom(
  311.                   HookManager::masterLaunchTaskLabelDecorator(
  312.                       task_,
  313.                       framework->info,
  314.                       slave->info));
  315.             }
  316.             send(slave->pid, message);
  317.           }
  318.         }
  319.         break;
  320.       }
  321.       default:
  322.         LOG(ERROR) << "Unsupported offer operation " << operation.type();
  323.         break;
  324.     }
  325.   }
  326.   if (!_offeredResources.empty()) {
  327.     // Tell the allocator about the unused (e.g., refused) resources.
  328.     allocator->recoverResources(
  329.         frameworkId,
  330.         slaveId,
  331.         _offeredResources,
  332.         accept.filters());
  333.   }
  334. }

 

Mesos-Master将RunTaskMessage消息发送给Mesos-Slave