Flink 1.1 – ResourceManager

时间:2023-02-02 15:56:05

Flink resource manager的作用如图,

Flink 1.1 – ResourceManager

 

FlinkResourceManager

/**
*
* <h1>Worker allocation steps</h1>
*
* <ol>
* <li>The resource manager decides to request more workers. This can happen in order
* to fill the initial pool, or as a result of the JobManager requesting more workers.</li>
*
* <li>The resource master calls {@link #requestNewWorkers(int)}, which triggers requests
* for more containers. After that, the {@link #getNumWorkerRequestsPending()}
* should reflect the pending requests.</li>
*
* <li>The concrete framework may acquire containers and then trigger to start TaskManagers
* in those containers. That should be reflected in {@link #getNumWorkersPendingRegistration()}.</li>
*
* <li>At some point, the TaskManager processes will have started and send a registration
* message to the JobManager. The JobManager will perform
* a lookup with the ResourceManager to check if it really started this TaskManager.
* The method {@link #workerStarted(ResourceID)} will be called
* to inform about a registered worker.</li>
* </ol>
*
*/
public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable> extends FlinkUntypedActor { /** The service to find the right leader JobManager (to support high availability) */
private final LeaderRetrievalService leaderRetriever; //用于发现leader jobmanager和当leader切换时收到通知 /** Map which contains the workers from which we know that they have been successfully started
* in a container. This notification is sent by the JM when a TM tries to register at it. */
private final Map<ResourceID, WorkerType> startedWorkers; //已经成功启动的Workers,当他启动成功注册到JM的时候,JM会发出通知 /** The JobManager that the framework master manages resources for */
private ActorRef jobManager; /** Our JobManager's leader session */
private UUID leaderSessionID; /** The size of the worker pool that the resource master strives to maintain */
private int designatedPoolSize; //resource pool大小

上面注释里面,把申请resource的过程写的蛮清楚的

ResourceManager作为actor, 主要是处理message,

    @Override
protected void handleMessage(Object message) {
try {
// --- messages about worker allocation and pool sizes if (message instanceof CheckAndAllocateContainers) {
checkWorkersPool();
}
else if (message instanceof SetWorkerPoolSize) {
SetWorkerPoolSize msg = (SetWorkerPoolSize) message;
adjustDesignatedNumberOfWorkers(msg.numberOfWorkers());
}
else if (message instanceof RemoveResource) {
RemoveResource msg = (RemoveResource) message;
removeRegisteredResource(msg.resourceId());
} // --- lookup of registered resources else if (message instanceof NotifyResourceStarted) {
NotifyResourceStarted msg = (NotifyResourceStarted) message;
handleResourceStarted(sender(), msg.getResourceID());
} // --- messages about JobManager leader status and registration else if (message instanceof NewLeaderAvailable) {
NewLeaderAvailable msg = (NewLeaderAvailable) message;
newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId());
}
else if (message instanceof TriggerRegistrationAtJobManager) {
TriggerRegistrationAtJobManager msg = (TriggerRegistrationAtJobManager) message;
triggerConnectingToJobManager(msg.jobManagerAddress());
}
else if (message instanceof RegisterResourceManagerSuccessful) {
RegisterResourceManagerSuccessful msg = (RegisterResourceManagerSuccessful) message;
jobManagerLeaderConnected(msg.jobManager(), msg.currentlyRegisteredTaskManagers());
}

 

其中关键的是,

checkWorkersPool

/**
* This method causes the resource framework master to <b>synchronously</b>re-examine
* the set of available and pending workers containers, and allocate containers
* if needed.
*
* This method does not automatically release workers, because it is not visible to
* this resource master which workers can be released. Instead, the JobManager must
* explicitly release individual workers.
*/
private void checkWorkersPool() {
int numWorkersPending = getNumWorkerRequestsPending();
int numWorkersPendingRegistration = getNumWorkersPendingRegistration(); // see how many workers we want, and whether we have enough
int allAvailableAndPending = startedWorkers.size() +
numWorkersPending + numWorkersPendingRegistration; int missing = designatedPoolSize - allAvailableAndPending; if (missing > 0) {
requestNewWorkers(missing); //如果现有的worker不够,去requestNewWorker
}
}

job在收到taskManager的register信息后,会通知ResourceManager,调用到handleResourceStarted

/**
* Tells the ResourceManager that a TaskManager had been started in a container with the given
* resource id.
*
* @param jobManager The sender (JobManager) of the message
* @param resourceID The resource id of the started TaskManager
*/
private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) {
if (resourceID != null) {
// check if resourceID is already registered (TaskManager may send duplicate register messages)
WorkerType oldWorker = startedWorkers.get(resourceID);
if (oldWorker != null) { //看看该worker是否已经存在
LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID);
} else {
WorkerType newWorker = workerStarted(resourceID); //取得worker if (newWorker != null) {
startedWorkers.put(resourceID, newWorker); //注册新的worker
LOG.info("TaskManager {} has started.", resourceID);
} else {
LOG.info("TaskManager {} has not been started by this resource manager.", resourceID);
}
}
} // Acknowledge the resource registration
jobManager.tell(decorateMessage(Acknowledge.get()), self()); //告诉jobManager,已经完成注册
}

 

Job资源分配的过程,

在submitJob中,会生成ExecutionGraph

最终调用到,

executionGraph.scheduleForExecution(scheduler)

接着,ExecutionGraph

public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(slotProvider, allowQueuedScheduling);
}
}

然后,ExecutionJobVertex

public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {

   ExecutionVertex[] vertices = this.taskVertices;

   // kick off the tasks
for (ExecutionVertex ev : vertices) {
ev.scheduleForExecution(slotProvider, queued);
}
}

再,ExecutionVertex

public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
return this.currentExecution.scheduleForExecution(slotProvider, queued);
}

最终,Execution

public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {

    final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
final CoLocationConstraint locationConstraint = vertex.getLocationConstraint(); if (transitionState(CREATED, SCHEDULED)) { ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, sharingGroup) :
new ScheduledUnit(this, sharingGroup, locationConstraint); // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned
// in all cases where the deployment failed. we use many try {} finally {} clauses to assure that
final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //异步去申请资源 // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
// that we directly deploy the tasks if the slot allocation future is completed. This is
// necessary for immediate deployment.
final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {
@Override
public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
if (simpleSlot != null) {
try {
deployToSlot(simpleSlot); //如果申请到,去部署
} catch (Throwable t) {
try {
simpleSlot.releaseSlot();
} finally {
markFailed(t);
}
}
}
else {
markFailed(throwable);
}
return null;
}
}); return true;
}

 

调用到,slotProvider.allocateSlot, slotProvider即Scheduler

@Override
public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)
throws NoResourceAvailableException { final Object ret = scheduleTask(task, allowQueued);
if (ret instanceof SimpleSlot) {
return FlinkCompletableFuture.completed((SimpleSlot) ret); //如果是SimpleSlot,即已经分配成功,表示future结束
}
else if (ret instanceof Future) {
return (Future) ret; //Future说明没有足够资源,申请还在异步中,继续future
}
else {
throw new RuntimeException();
}
}

 

scheduleTask

/**
* Returns either a {@link SimpleSlot}, or a {@link Future}.
*/
private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException { final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
preferredLocations != null && preferredLocations.iterator().hasNext(); synchronized (globalLock) { //全局锁 SlotSharingGroup sharingUnit = task.getSlotSharingGroup(); if (sharingUnit != null) { //如果是共享slot // 1) === If the task has a slot sharing group, schedule with shared slots === final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();
final CoLocationConstraint constraint = task.getLocationConstraint(); // get a slot from the group, if the group has one for us (and can fulfill the constraint)
final SimpleSlot slotFromGroup;
if (constraint == null) {
slotFromGroup = assignment.getSlotForTask(vertex); //试图从现有的slots中找合适的
}
else {
slotFromGroup = assignment.getSlotForTask(vertex, constraint);
} SimpleSlot newSlot = null;
SimpleSlot toUse = null; // the following needs to make sure any allocated slot is released in case of an error
try { // check whether the slot from the group is already what we want.
// any slot that is local, or where the assignment was unconstrained is good!
if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) { //如果可以找到合适的
updateLocalityCounters(slotFromGroup, vertex);
return slotFromGroup; //已经找到合适的slot,返回
} // the group did not have a local slot for us. see if we can one (or a better one)
newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); //试图申请一个新的slot if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {
// if there is no slot from the group, or the new slot is local,
// then we use the new slot
if (slotFromGroup != null) {
slotFromGroup.releaseSlot();
}
toUse = newSlot;
}
else {
// both are available and usable. neither is local. in that case, we may
// as well use the slot from the sharing group, to minimize the number of
// instances that the job occupies
newSlot.releaseSlot();
toUse = slotFromGroup;
} // if this is the first slot for the co-location constraint, we lock
// the location, because we are going to use that slot
if (constraint != null && !constraint.isAssigned()) {
constraint.lockLocation();
} updateLocalityCounters(toUse, vertex);
} return toUse; //返回申请的slot
}
else { //如果不是共享slot,比较简单 // 2) === schedule without hints and sharing === SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); //直接申请slot
if (slot != null) {
updateLocalityCounters(slot, vertex);
return slot; //申请到了就返回slot
}
else {
// no resource available now, so queue the request
if (queueIfNoResource) { //如果可以queue
CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
this.taskQueue.add(new QueuedTask(task, future)); //把task缓存起来,并把future对象返回,表示异步申请
return future;
}
}
}
}
}

我们直接看非共享slot的case,

会调用到, getFreeSlotForTask

/**
* Gets a suitable instance to schedule the vertex execution to.
* <p>
* NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.
*
* @param vertex The task to run.
* @return The instance to run the vertex on, it {@code null}, if no instance is available.
*/
protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
Iterable<TaskManagerLocation> requestedLocations,
boolean localOnly) {
// we need potentially to loop multiple times, because there may be false positives
// in the set-with-available-instances
while (true) {
Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //找到分配slot的并符合location约束的instance if (instanceLocalityPair == null){
return null; //没有合适的instance,分配失败
} Instance instanceToUse = instanceLocalityPair.getLeft();
Locality locality = instanceLocalityPair.getRight(); try {
SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId()); //从instance分配出slot // if the instance has further available slots, re-add it to the set of available resources.
if (instanceToUse.hasResourcesAvailable()) { //如果这个实例还有resources,放入instancesWithAvailableResources,下次可以继续分配
this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);
} if (slot != null) {
slot.setLocality(locality);
return slot; //成功就返回slot
}
}
catch (InstanceDiedException e) {
// the instance died it has not yet been propagated to this scheduler
// remove the instance from the set of available instances
removeInstance(instanceToUse);
} // if we failed to get a slot, fall through the loop
}
}

 

findInstance

/**
* Tries to find a requested instance. If no such instance is available it will return a non-
* local instance. If no such instance exists (all slots occupied), then return null.
*
* <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p>
*
* @param requestedLocations The list of preferred instances. May be null or empty, which indicates that
* no locality preference exists.
* @param localOnly Flag to indicate whether only one of the exact local instances can be chosen.
*/
private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) { // drain the queue of newly available instances
while (this.newlyAvailableInstances.size() > 0) { //把newlyAvailableInstances新加到instancesWithAvailableResources
Instance queuedInstance = this.newlyAvailableInstances.poll();
if (queuedInstance != null) {
this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);
}
} // if nothing is available at all, return null
if (this.instancesWithAvailableResources.isEmpty()) { //如果没有instancesWithAvailableResources,直接返回失败
return null;
} Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator(); if (locations != null && locations.hasNext()) { //按照locality preference依次找instance
// we have a locality preference while (locations.hasNext()) {
TaskManagerLocation location = locations.next();
if (location != null) {
Instance instance = instancesWithAvailableResources.remove(location.getResourceID());
if (instance != null) {
return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL);
}
}
} // no local instance available
if (localOnly) {
return null;
}
else {
// take the first instance from the instances with resources
Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();
Instance instanceToUse = instances.next();
instances.remove(); return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL);
}
}
else {
// no location preference, so use some instance
Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();
Instance instanceToUse = instances.next();
instances.remove(); return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED);
}
}

 

那么继续,newlyAvailableInstances,哪儿来的?

@Override
public void newInstanceAvailable(Instance instance) { // synchronize globally for instance changes
synchronized (this.globalLock) { try {
// make sure we get notifications about slots becoming available
instance.setSlotAvailabilityListener(this); //将Scheduler设为Instance的SlotAvailabilityListener // store the instance in the by-host-lookup
String instanceHostName = instance.getTaskManagerLocation().getHostname();
Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
if (instanceSet == null) {
instanceSet = new HashSet<Instance>();
allInstancesByHost.put(instanceHostName, instanceSet);
}
instanceSet.add(instance); // add it to the available resources and let potential waiters know
this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); //放入instancesWithAvailableResources // add all slots as available
for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
newSlotAvailable(instance);
}
}
}
}

 

    @Override
public void newSlotAvailable(final Instance instance) { // WARNING: The asynchrony here is necessary, because we cannot guarantee the order
// of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:
//
// -> The scheduler needs to grab them (1) global scheduler lock
// (2) slot/instance lock
// -> The slot releasing grabs (1) slot/instance (for releasing) and
// (2) scheduler (to check whether to take a new task item
//
// that leads with a high probability to deadlocks, when scheduling fast this.newlyAvailableInstances.add(instance); //加入到newlyAvailableInstances Futures.future(new Callable<Object>() {
@Override
public Object call() throws Exception {
handleNewSlot(); //异步的处理queue中的task,当有新的slot要把queue中的task执行掉
return null;
}
}, executionContext);
}

 

接着newInstanceAvailable,在InstanceManager里面被调用,

    private void notifyNewInstance(Instance instance) {
synchronized (this.instanceListeners) {
for (InstanceListener listener : this.instanceListeners) {
try {
listener.newInstanceAvailable(instance);
}
catch (Throwable t) {
LOG.error("Notification of new instance availability failed.", t);
}
}
}
}

notifyNewInstance在registerTaskManager中被调用,

registerTaskManager是在JobManager里面当taskManager注册时被调用的

case msg @ RegisterTaskManager(
resourceId,
connectionInfo,
hardwareInformation,
numberOfSlots) => val taskManager = sender() currentResourceManager match {
case Some(rm) => //如果有resourceManager
val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout) //通知ResourceMananger,某个resource已经成功启动
} // ResourceManager is told about the resource, now let's try to register TaskManager
if (instanceManager.isRegistered(resourceId)) { //如果已经注册过
val instanceID = instanceManager.getRegisteredInstance(resourceId).getId taskManager ! decorateMessage(
AlreadyRegistered(
instanceID,
libraryCacheManager.getBlobServerPort))
} else { //新的resource
try {
val actorGateway = new AkkaActorGateway(taskManager, leaderSessionID.orNull)
val taskManagerGateway = new ActorTaskManagerGateway(actorGateway) val instanceID = instanceManager.registerTaskManager( //向InstanceManager注册该TaskManager
taskManagerGateway,
connectionInfo,
hardwareInformation,
numberOfSlots) taskManagerMap.put(taskManager, instanceID) //在jobManager里面记录该taskManager taskManager ! decorateMessage(
AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) //通知taskManager完成注册 // to be notified when the taskManager is no longer reachable
context.watch(taskManager)
}
}

 

这个版本没有实现图中的架构

当前TaskManager还是注册到JobManager,然后JobMananger会通知ResourceManager

当前ResourceManager只是起到一个记录的作用

ResourceManager没有从JobManager中独立出来

仍然是这种架构,

Flink 1.1 – ResourceManager