package org.apache.flink.runtime.scheduler;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator.class */
public class SimpleExecutionSlotAllocator implements ExecutionSlotAllocator {
    private final PhysicalSlotProvider slotProvider;
    private final boolean slotWillBeOccupiedIndefinitely;
    private final Function<ExecutionAttemptID, ResourceProfile> resourceProfileRetriever;
    private final DualKeyLinkedMap<ExecutionAttemptID, SlotRequestId, CompletableFuture<LogicalSlot>> requestedPhysicalSlots = new DualKeyLinkedMap<>();

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator$Factory.class */
    public static class Factory implements ExecutionSlotAllocatorFactory {
        private final PhysicalSlotProvider slotProvider;
        private final boolean slotWillBeOccupiedIndefinitely;

        public Factory(PhysicalSlotProvider physicalSlotProvider, boolean z) {
            this.slotProvider = physicalSlotProvider;
            this.slotWillBeOccupiedIndefinitely = z;
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory
        public ExecutionSlotAllocator createInstance(ExecutionSlotAllocationContext executionSlotAllocationContext) {
            return new SimpleExecutionSlotAllocator(this.slotProvider, executionAttemptID -> {
                return executionSlotAllocationContext.getResourceProfile(executionAttemptID.getExecutionVertexId());
            }, this.slotWillBeOccupiedIndefinitely);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SimpleExecutionSlotAllocator$LogicalSlotHolder.class */
    public class LogicalSlotHolder implements PhysicalSlot.Payload {
        private final SingleLogicalSlot logicalSlot;

        private LogicalSlotHolder(SingleLogicalSlot singleLogicalSlot) {
            this.logicalSlot = (SingleLogicalSlot) Preconditions.checkNotNull(singleLogicalSlot);
        }

        @Override // org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot.Payload
        public void release(Throwable th) {
            this.logicalSlot.release(th);
            SimpleExecutionSlotAllocator.this.releaseSlot(this.logicalSlot, new FlinkException("Physical slot releases its payload."));
        }

        @Override // org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot.Payload
        public boolean willOccupySlotIndefinitely() {
            return this.logicalSlot.willOccupySlotIndefinitely();
        }
    }

    SimpleExecutionSlotAllocator(PhysicalSlotProvider physicalSlotProvider, Function<ExecutionAttemptID, ResourceProfile> function, boolean z) {
        this.slotProvider = (PhysicalSlotProvider) Preconditions.checkNotNull(physicalSlotProvider);
        this.slotWillBeOccupiedIndefinitely = z;
        this.resourceProfileRetriever = (Function) Preconditions.checkNotNull(function);
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocator
    public List<ExecutionSlotAssignment> allocateSlotsFor(List<ExecutionAttemptID> list) {
        return (List) list.stream().map(executionAttemptID -> {
            return new ExecutionSlotAssignment(executionAttemptID, allocateSlotFor(executionAttemptID));
        }).collect(Collectors.toList());
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CompletableFuture<LogicalSlot> allocateSlotFor(ExecutionAttemptID executionAttemptID) {
        if (this.requestedPhysicalSlots.containsKeyA(executionAttemptID)) {
            return this.requestedPhysicalSlots.getValueByKeyA(executionAttemptID);
        }
        SlotRequestId slotRequestId = new SlotRequestId();
        ResourceProfile apply = this.resourceProfileRetriever.apply(executionAttemptID);
        CompletableFuture thenApply = this.slotProvider.allocatePhysicalSlot(new PhysicalSlotRequest(slotRequestId, SlotProfile.priorAllocation(apply, apply, Collections.emptyList(), Collections.emptyList(), Collections.emptySet()), this.slotWillBeOccupiedIndefinitely)).thenApply(result -> {
            return allocateLogicalSlotFromPhysicalSlot(slotRequestId, result.getPhysicalSlot(), this.slotWillBeOccupiedIndefinitely);
        });
        thenApply.exceptionally(th -> {
            this.requestedPhysicalSlots.removeKeyA(executionAttemptID);
            this.slotProvider.cancelSlotRequest(slotRequestId, th);
            return null;
        });
        this.requestedPhysicalSlots.put(executionAttemptID, slotRequestId, thenApply);
        return thenApply;
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocator
    public void cancel(ExecutionAttemptID executionAttemptID) {
        CompletableFuture<LogicalSlot> valueByKeyA = this.requestedPhysicalSlots.getValueByKeyA(executionAttemptID);
        if (valueByKeyA != null) {
            valueByKeyA.cancel(false);
        }
    }

    private void returnLogicalSlot(LogicalSlot logicalSlot) {
        releaseSlot(logicalSlot, new FlinkException("Slot is being returned from SimpleExecutionSlotAllocator."));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void releaseSlot(LogicalSlot logicalSlot, Throwable th) {
        this.requestedPhysicalSlots.removeKeyB(logicalSlot.getSlotRequestId());
        this.slotProvider.cancelSlotRequest(logicalSlot.getSlotRequestId(), th);
    }

    private LogicalSlot allocateLogicalSlotFromPhysicalSlot(SlotRequestId slotRequestId, PhysicalSlot physicalSlot, boolean z) {
        SingleLogicalSlot singleLogicalSlot = new SingleLogicalSlot(slotRequestId, physicalSlot, Locality.UNKNOWN, this::returnLogicalSlot, z);
        if (physicalSlot.tryAssignPayload(new LogicalSlotHolder(singleLogicalSlot))) {
            return singleLogicalSlot;
        }
        throw new IllegalStateException("BUG: Unexpected physical slot payload assignment failure!");
    }
}
