package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceDiedException;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.instance.SharedSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/Scheduler.class */
public class Scheduler implements InstanceListener, SlotAvailabilityListener, SlotProvider {
    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
    private final Object globalLock = new Object();
    private final Set<Instance> allInstances = new HashSet();
    private final HashMap<String, Set<Instance>> allInstancesByHost = new HashMap<>();
    private final Map<ResourceID, Instance> instancesWithAvailableResources = new LinkedHashMap();
    private final Queue<QueuedTask> taskQueue = new ArrayDeque();
    private final BlockingQueue<Instance> newlyAvailableInstances = new LinkedBlockingQueue();
    private int unconstrainedAssignments;
    private int localizedAssignments;
    private int nonLocalizedAssignments;
    private final Executor executor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/Scheduler$QueuedTask.class */
    public static final class QueuedTask {
        private final ScheduledUnit task;
        private final CompletableFuture<SimpleSlot> future;

        public QueuedTask(ScheduledUnit scheduledUnit, CompletableFuture<SimpleSlot> completableFuture) {
            this.task = scheduledUnit;
            this.future = completableFuture;
        }

        public ScheduledUnit getTask() {
            return this.task;
        }

        public CompletableFuture<SimpleSlot> getFuture() {
            return this.future;
        }
    }

    public Scheduler(Executor executor) {
        this.executor = (Executor) Preconditions.checkNotNull(executor);
    }

    public void shutdown() {
        synchronized (this.globalLock) {
            for (Instance instance : this.allInstances) {
                instance.removeSlotListener();
                instance.cancelAndReleaseAllSlots();
            }
            this.allInstances.clear();
            this.allInstancesByHost.clear();
            this.instancesWithAvailableResources.clear();
            this.taskQueue.clear();
        }
    }

    @Override // org.apache.flink.runtime.instance.SlotProvider
    public Future<SimpleSlot> allocateSlot(ScheduledUnit scheduledUnit, boolean z) {
        try {
            Object scheduleTask = scheduleTask(scheduledUnit, z);
            if (scheduleTask instanceof SimpleSlot) {
                return FlinkCompletableFuture.completed((SimpleSlot) scheduleTask);
            }
            if (scheduleTask instanceof Future) {
                return (Future) scheduleTask;
            }
            throw new RuntimeException();
        } catch (NoResourceAvailableException e) {
            return FlinkCompletableFuture.completedExceptionally(e);
        }
    }

    private Object scheduleTask(ScheduledUnit scheduledUnit, boolean z) throws NoResourceAvailableException {
        Iterable<TaskManagerLocation> preferredLocationsBasedOnInputs;
        boolean z2;
        if (scheduledUnit == null) {
            throw new NullPointerException();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Scheduling task " + scheduledUnit);
        }
        ExecutionVertex vertex = scheduledUnit.getTaskToExecute().getVertex();
        Iterable<TaskManagerLocation> preferredLocationsBasedOnInputs2 = vertex.getPreferredLocationsBasedOnInputs();
        synchronized (this.globalLock) {
            SlotSharingGroup slotSharingGroup = scheduledUnit.getSlotSharingGroup();
            if (slotSharingGroup == null) {
                SimpleSlot freeSlotForTask = getFreeSlotForTask(vertex, preferredLocationsBasedOnInputs2, false);
                if (freeSlotForTask != null) {
                    updateLocalityCounters(freeSlotForTask, vertex);
                    return freeSlotForTask;
                }
                if (z) {
                    FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
                    this.taskQueue.add(new QueuedTask(scheduledUnit, flinkCompletableFuture));
                    return flinkCompletableFuture;
                }
                if (0 != 0) {
                    throw new NoResourceAvailableException("Could not schedule task " + vertex + " to any of the required hosts: " + getHostnamesFromInstances(preferredLocationsBasedOnInputs2));
                }
                throw new NoResourceAvailableException(getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
            }
            if (z) {
                throw new IllegalArgumentException("A task with a vertex sharing group was scheduled in a queued fashion.");
            }
            SlotSharingGroupAssignment taskAssignment = slotSharingGroup.getTaskAssignment();
            CoLocationConstraint locationConstraint = scheduledUnit.getLocationConstraint();
            if (locationConstraint != null && 0 != 0) {
                throw new IllegalArgumentException("The scheduling cannot be constrained simultaneously by a co-location constraint and an external location constraint.");
            }
            SimpleSlot slotForTask = locationConstraint == null ? taskAssignment.getSlotForTask(vertex) : taskAssignment.getSlotForTask(vertex, locationConstraint);
            Slot slot = null;
            SimpleSlot simpleSlot = null;
            if (slotForTask != null) {
                try {
                    if (slotForTask.getLocality() != Locality.NON_LOCAL) {
                        if (locationConstraint != null && !locationConstraint.isAssigned()) {
                            locationConstraint.lockLocation();
                        }
                        updateLocalityCounters(slotForTask, vertex);
                        return slotForTask;
                    }
                } catch (NoResourceAvailableException e) {
                    throw e;
                } catch (Throwable th) {
                    if (slotForTask != null) {
                        slotForTask.releaseSlot();
                    }
                    if (0 != 0) {
                        slot.releaseSlot();
                    }
                    ExceptionUtils.rethrow(th, "An error occurred while allocating a slot in a sharing group");
                }
            }
            if (locationConstraint == null || !locationConstraint.isAssigned()) {
                preferredLocationsBasedOnInputs = vertex.getPreferredLocationsBasedOnInputs();
                z2 = false;
            } else {
                preferredLocationsBasedOnInputs = Collections.singleton(locationConstraint.getLocation());
                z2 = true;
            }
            SimpleSlot newSlotForSharingGroup = getNewSlotForSharingGroup(vertex, preferredLocationsBasedOnInputs, taskAssignment, locationConstraint, z2);
            if (newSlotForSharingGroup == null) {
                if (slotForTask == null) {
                    if (locationConstraint != null && locationConstraint.isAssigned()) {
                        throw new NoResourceAvailableException("Could not allocate a slot on instance " + locationConstraint.getLocation() + ", as required by the co-location constraint.");
                    }
                    if (0 != 0) {
                        throw new NoResourceAvailableException("Could not schedule task " + vertex + " to any of the required hosts: " + getHostnamesFromInstances(preferredLocationsBasedOnInputs2));
                    }
                    throw new NoResourceAvailableException(scheduledUnit, getNumberOfAvailableInstances(), getTotalNumberOfSlots(), getNumberOfAvailableSlots());
                }
                simpleSlot = slotForTask;
            } else if (slotForTask == null || !slotForTask.isAlive() || newSlotForSharingGroup.getLocality() == Locality.LOCAL) {
                if (slotForTask != null) {
                    slotForTask.releaseSlot();
                }
                simpleSlot = newSlotForSharingGroup;
            } else {
                newSlotForSharingGroup.releaseSlot();
                simpleSlot = slotForTask;
            }
            if (locationConstraint != null && !locationConstraint.isAssigned()) {
                locationConstraint.lockLocation();
            }
            updateLocalityCounters(simpleSlot, vertex);
            return simpleSlot;
        }
    }

    protected SimpleSlot getFreeSlotForTask(ExecutionVertex executionVertex, Iterable<TaskManagerLocation> iterable, boolean z) {
        SimpleSlot allocateSimpleSlot;
        while (true) {
            Pair<Instance, Locality> findInstance = findInstance(iterable, z);
            if (findInstance == null) {
                return null;
            }
            Instance left = findInstance.getLeft();
            Locality right = findInstance.getRight();
            try {
                allocateSimpleSlot = left.allocateSimpleSlot(executionVertex.getJobId());
                if (left.hasResourcesAvailable()) {
                    this.instancesWithAvailableResources.put(left.getTaskManagerID(), left);
                }
            } catch (InstanceDiedException e) {
                removeInstance(left);
            }
            if (allocateSimpleSlot != null) {
                allocateSimpleSlot.setLocality(right);
                return allocateSimpleSlot;
            }
            continue;
        }
    }

    protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex executionVertex, Iterable<TaskManagerLocation> iterable, SlotSharingGroupAssignment slotSharingGroupAssignment, CoLocationConstraint coLocationConstraint, boolean z) {
        while (true) {
            Pair<Instance, Locality> findInstance = findInstance(iterable, z);
            if (findInstance == null) {
                return null;
            }
            Instance left = findInstance.getLeft();
            Locality right = findInstance.getRight();
            try {
                JobVertexID jobvertexId = executionVertex.getJobvertexId();
                SharedSlot allocateSharedSlot = left.allocateSharedSlot(executionVertex.getJobId(), slotSharingGroupAssignment);
                if (left.hasResourcesAvailable()) {
                    this.instancesWithAvailableResources.put(left.getTaskManagerID(), left);
                }
                if (allocateSharedSlot != null) {
                    SimpleSlot addSharedSlotAndAllocateSubSlot = coLocationConstraint == null ? slotSharingGroupAssignment.addSharedSlotAndAllocateSubSlot(allocateSharedSlot, right, jobvertexId) : slotSharingGroupAssignment.addSharedSlotAndAllocateSubSlot(allocateSharedSlot, right, coLocationConstraint);
                    if (addSharedSlotAndAllocateSubSlot != null) {
                        return addSharedSlotAndAllocateSubSlot;
                    }
                    allocateSharedSlot.releaseSlot();
                }
            } catch (InstanceDiedException e) {
                removeInstance(left);
            }
        }
    }

    private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> iterable, boolean z) {
        Instance remove;
        while (this.newlyAvailableInstances.size() > 0) {
            Instance poll = this.newlyAvailableInstances.poll();
            if (poll != null) {
                this.instancesWithAvailableResources.put(poll.getTaskManagerID(), poll);
            }
        }
        if (this.instancesWithAvailableResources.isEmpty()) {
            return null;
        }
        Iterator<TaskManagerLocation> it = iterable == null ? null : iterable.iterator();
        if (it == null || !it.hasNext()) {
            Iterator<Instance> it2 = this.instancesWithAvailableResources.values().iterator();
            Instance next = it2.next();
            it2.remove();
            return new ImmutablePair(next, Locality.UNCONSTRAINED);
        }
        while (it.hasNext()) {
            TaskManagerLocation next2 = it.next();
            if (next2 != null && (remove = this.instancesWithAvailableResources.remove(next2.getResourceID())) != null) {
                return new ImmutablePair(remove, Locality.LOCAL);
            }
        }
        if (z) {
            return null;
        }
        Iterator<Instance> it3 = this.instancesWithAvailableResources.values().iterator();
        Instance next3 = it3.next();
        it3.remove();
        return new ImmutablePair(next3, Locality.NON_LOCAL);
    }

    @Override // org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener
    public void newSlotAvailable(Instance instance) {
        this.newlyAvailableInstances.add(instance);
        this.executor.execute(new Runnable() { // from class: org.apache.flink.runtime.jobmanager.scheduler.Scheduler.1
            @Override // java.lang.Runnable
            public void run() {
                Scheduler.this.handleNewSlot();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleNewSlot() {
        synchronized (this.globalLock) {
            Instance poll = this.newlyAvailableInstances.poll();
            if (poll == null || !poll.hasResourcesAvailable()) {
                return;
            }
            QueuedTask peek = this.taskQueue.peek();
            if (peek != null) {
                ScheduledUnit task = peek.getTask();
                ExecutionVertex vertex = task.getTaskToExecute().getVertex();
                try {
                    SimpleSlot allocateSimpleSlot = poll.allocateSimpleSlot(vertex.getJobId());
                    if (allocateSimpleSlot != null) {
                        this.taskQueue.poll();
                        if (peek.getFuture() != null) {
                            try {
                                peek.getFuture().complete(allocateSimpleSlot);
                            } catch (Throwable th) {
                                LOG.error("Error calling allocation future for task " + vertex.getTaskNameWithSubtaskIndex(), th);
                                task.getTaskToExecute().fail(th);
                            }
                        }
                    }
                } catch (InstanceDiedException e) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Instance " + poll + " was marked dead asynchronously.");
                    }
                    removeInstance(poll);
                }
            } else {
                this.instancesWithAvailableResources.put(poll.getTaskManagerID(), poll);
            }
        }
    }

    private void updateLocalityCounters(SimpleSlot simpleSlot, ExecutionVertex executionVertex) {
        Locality locality = simpleSlot.getLocality();
        switch (locality) {
            case UNCONSTRAINED:
                this.unconstrainedAssignments++;
                break;
            case LOCAL:
                this.localizedAssignments++;
                break;
            case NON_LOCAL:
                this.nonLocalizedAssignments++;
                break;
            default:
                throw new RuntimeException(locality.name());
        }
        if (LOG.isDebugEnabled()) {
            switch (locality) {
                case UNCONSTRAINED:
                    LOG.debug("Unconstrained assignment: " + executionVertex.getTaskNameWithSubtaskIndex() + " --> " + simpleSlot);
                    return;
                case LOCAL:
                    LOG.debug("Local assignment: " + executionVertex.getTaskNameWithSubtaskIndex() + " --> " + simpleSlot);
                    return;
                case NON_LOCAL:
                    LOG.debug("Non-local assignment: " + executionVertex.getTaskNameWithSubtaskIndex() + " --> " + simpleSlot);
                    return;
                default:
                    return;
            }
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceListener
    public void newInstanceAvailable(Instance instance) {
        if (instance == null) {
            throw new IllegalArgumentException();
        }
        if (instance.getNumberOfAvailableSlots() <= 0) {
            throw new IllegalArgumentException("The given instance has no resources.");
        }
        if (!instance.isAlive()) {
            throw new IllegalArgumentException("The instance is not alive.");
        }
        synchronized (this.globalLock) {
            if (!this.allInstances.add(instance)) {
                throw new IllegalArgumentException("The instance is already contained.");
            }
            try {
                instance.setSlotAvailabilityListener(this);
                String hostname = instance.getTaskManagerLocation().getHostname();
                Set<Instance> set = this.allInstancesByHost.get(hostname);
                if (set == null) {
                    set = new HashSet();
                    this.allInstancesByHost.put(hostname, set);
                }
                set.add(instance);
                this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance);
                for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
                    newSlotAvailable(instance);
                }
            } catch (Throwable th) {
                LOG.error("Scheduler could not add new instance " + instance, th);
                removeInstance(instance);
            }
        }
    }

    @Override // org.apache.flink.runtime.instance.InstanceListener
    public void instanceDied(Instance instance) {
        if (instance == null) {
            throw new IllegalArgumentException();
        }
        instance.markDead();
        synchronized (this.globalLock) {
            removeInstance(instance);
        }
    }

    private void removeInstance(Instance instance) {
        if (instance == null) {
            throw new NullPointerException();
        }
        this.allInstances.remove(instance);
        this.instancesWithAvailableResources.remove(instance.getTaskManagerID());
        String hostname = instance.getTaskManagerLocation().getHostname();
        Set<Instance> set = this.allInstancesByHost.get(hostname);
        if (set != null) {
            set.remove(instance);
            if (set.isEmpty()) {
                this.allInstancesByHost.remove(hostname);
            }
        }
    }

    public int getNumberOfAvailableSlots() {
        int i = 0;
        synchronized (this.globalLock) {
            processNewlyAvailableInstances();
            Iterator<Instance> it = this.instancesWithAvailableResources.values().iterator();
            while (it.hasNext()) {
                i += it.next().getNumberOfAvailableSlots();
            }
        }
        return i;
    }

    public int getTotalNumberOfSlots() {
        int i = 0;
        synchronized (this.globalLock) {
            for (Instance instance : this.allInstances) {
                if (instance.isAlive()) {
                    i += instance.getTotalNumberOfSlots();
                }
            }
        }
        return i;
    }

    public int getNumberOfAvailableInstances() {
        int i = 0;
        synchronized (this.globalLock) {
            Iterator<Instance> it = this.allInstances.iterator();
            while (it.hasNext()) {
                if (it.next().isAlive()) {
                    i++;
                }
            }
        }
        return i;
    }

    public int getNumberOfInstancesWithAvailableSlots() {
        int size;
        synchronized (this.globalLock) {
            processNewlyAvailableInstances();
            size = this.instancesWithAvailableResources.size();
        }
        return size;
    }

    public Map<String, List<Instance>> getInstancesByHost() {
        HashMap hashMap;
        synchronized (this.globalLock) {
            hashMap = new HashMap();
            for (Map.Entry<String, Set<Instance>> entry : this.allInstancesByHost.entrySet()) {
                hashMap.put(entry.getKey(), new ArrayList(entry.getValue()));
            }
        }
        return hashMap;
    }

    public int getNumberOfUnconstrainedAssignments() {
        return this.unconstrainedAssignments;
    }

    public int getNumberOfLocalizedAssignments() {
        return this.localizedAssignments;
    }

    public int getNumberOfNonLocalizedAssignments() {
        return this.nonLocalizedAssignments;
    }

    private void processNewlyAvailableInstances() {
        synchronized (this.globalLock) {
            while (true) {
                Instance poll = this.newlyAvailableInstances.poll();
                if (poll != null) {
                    if (poll.hasResourcesAvailable()) {
                        this.instancesWithAvailableResources.put(poll.getTaskManagerID(), poll);
                    }
                }
            }
        }
    }

    private static String getHostnamesFromInstances(Iterable<TaskManagerLocation> iterable) {
        StringBuilder sb = new StringBuilder();
        boolean z = false;
        for (TaskManagerLocation taskManagerLocation : iterable) {
            if (z) {
                sb.append(", ");
            } else {
                z = true;
            }
            sb.append(taskManagerLocation.getHostname());
        }
        return sb.toString();
    }
}
