/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
import org.apache.flink.runtime.scheduler.ExecutionSlotSharingGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.DualKeyLinkedMap;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SharedSlot
implements SlotOwner,
PhysicalSlot.Payload {
    private static final Logger LOG = LoggerFactory.getLogger(SharedSlot.class);
    private final SlotRequestId physicalSlotRequestId;
    private final ResourceProfile physicalSlotResourceProfile;
    private final ExecutionSlotSharingGroup executionSlotSharingGroup;
    private final CompletableFuture<PhysicalSlot> slotContextFuture;
    private final DualKeyLinkedMap<ExecutionVertexID, SlotRequestId, CompletableFuture<SingleLogicalSlot>> requestedLogicalSlots;
    private final boolean slotWillBeOccupiedIndefinitely;
    private final Consumer<ExecutionSlotSharingGroup> externalReleaseCallback;
    private State state;

    SharedSlot(SlotRequestId physicalSlotRequestId, ResourceProfile physicalSlotResourceProfile, ExecutionSlotSharingGroup executionSlotSharingGroup, CompletableFuture<PhysicalSlot> slotContextFuture, boolean slotWillBeOccupiedIndefinitely, Consumer<ExecutionSlotSharingGroup> externalReleaseCallback) {
        this.physicalSlotRequestId = physicalSlotRequestId;
        this.physicalSlotResourceProfile = physicalSlotResourceProfile;
        this.executionSlotSharingGroup = executionSlotSharingGroup;
        this.slotContextFuture = slotContextFuture.thenApply(physicalSlot -> {
            Preconditions.checkState((boolean)physicalSlot.tryAssignPayload(this), (Object)"Unexpected physical slot payload assignment failure!");
            return physicalSlot;
        });
        this.requestedLogicalSlots = new DualKeyLinkedMap();
        this.slotWillBeOccupiedIndefinitely = slotWillBeOccupiedIndefinitely;
        this.externalReleaseCallback = externalReleaseCallback;
        this.state = State.ALLOCATED;
    }

    SlotRequestId getPhysicalSlotRequestId() {
        return this.physicalSlotRequestId;
    }

    ResourceProfile getPhysicalSlotResourceProfile() {
        return this.physicalSlotResourceProfile;
    }

    public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() {
        return this.executionSlotSharingGroup;
    }

    CompletableFuture<PhysicalSlot> getSlotContextFuture() {
        return this.slotContextFuture;
    }

    CompletableFuture<LogicalSlot> allocateLogicalSlot(ExecutionVertexID executionVertexId) {
        Preconditions.checkArgument((boolean)this.executionSlotSharingGroup.getExecutionVertexIds().contains(executionVertexId), (String)"Trying to allocate a logical slot for execution %s which is not in the ExecutionSlotSharingGroup", (Object[])new Object[]{executionVertexId});
        CompletableFuture<SingleLogicalSlot> logicalSlotFuture = this.requestedLogicalSlots.getValueByKeyA(executionVertexId);
        if (logicalSlotFuture != null) {
            LOG.debug("Request for {} already exists", (Object)this.getLogicalSlotString(executionVertexId));
        } else {
            logicalSlotFuture = this.allocateNonExistentLogicalSlot(executionVertexId);
        }
        return logicalSlotFuture.thenApply(Function.identity());
    }

    private CompletableFuture<SingleLogicalSlot> allocateNonExistentLogicalSlot(ExecutionVertexID executionVertexId) {
        SlotRequestId logicalSlotRequestId = new SlotRequestId();
        String logMessageBase = this.getLogicalSlotString(logicalSlotRequestId, executionVertexId);
        LOG.debug("Request a {}", (Object)logMessageBase);
        CompletionStage logicalSlotFuture = this.slotContextFuture.thenApply(physicalSlot -> {
            LOG.debug("Allocated {}", (Object)logMessageBase);
            return this.createLogicalSlot((PhysicalSlot)physicalSlot, logicalSlotRequestId);
        });
        this.requestedLogicalSlots.put(executionVertexId, logicalSlotRequestId, (CompletableFuture<SingleLogicalSlot>)logicalSlotFuture);
        ((CompletableFuture)logicalSlotFuture).exceptionally(cause -> {
            LOG.debug("Failed {}", (Object)logMessageBase, cause);
            this.removeLogicalSlotRequest(logicalSlotRequestId);
            return null;
        });
        return logicalSlotFuture;
    }

    private SingleLogicalSlot createLogicalSlot(PhysicalSlot physicalSlot, SlotRequestId logicalSlotRequestId) {
        return new SingleLogicalSlot(logicalSlotRequestId, physicalSlot, Locality.UNKNOWN, this, this.slotWillBeOccupiedIndefinitely);
    }

    void cancelLogicalSlotRequest(ExecutionVertexID executionVertexID, @Nullable Throwable cause) {
        Preconditions.checkState((this.state == State.ALLOCATED ? 1 : 0) != 0, (String)"SharedSlot (physical request %s) has been released", (Object[])new Object[]{this.physicalSlotRequestId});
        CompletableFuture<SingleLogicalSlot> logicalSlotFuture = this.requestedLogicalSlots.getValueByKeyA(executionVertexID);
        SlotRequestId logicalSlotRequestId = this.requestedLogicalSlots.getKeyBByKeyA(executionVertexID);
        if (logicalSlotFuture != null) {
            LOG.debug("Cancel {} from {}", (Object)this.getLogicalSlotString(logicalSlotRequestId), (Object)executionVertexID);
            if (cause == null) {
                logicalSlotFuture.cancel(false);
            } else {
                logicalSlotFuture.completeExceptionally(cause);
            }
        } else {
            LOG.debug("No request for logical {} from physical {}}", (Object)logicalSlotRequestId, (Object)this.physicalSlotRequestId);
        }
    }

    @Override
    public void returnLogicalSlot(LogicalSlot logicalSlot) {
        this.removeLogicalSlotRequest(logicalSlot.getSlotRequestId());
    }

    private void removeLogicalSlotRequest(SlotRequestId logicalSlotRequestId) {
        LOG.debug("Remove {}", (Object)this.getLogicalSlotString(logicalSlotRequestId));
        Preconditions.checkState((this.requestedLogicalSlots.removeKeyB(logicalSlotRequestId) != null ? 1 : 0) != 0, (Object)"Trying to remove a logical slot request which has been either already removed or never created.");
        this.releaseExternally();
    }

    @Override
    public void release(Throwable cause) {
        Preconditions.checkState((boolean)this.slotContextFuture.isDone(), (String)"Releasing of the shared slot is expected only from its successfully allocated physical slot ({})", (Object[])new Object[]{this.physicalSlotRequestId});
        LOG.debug("Release shared slot ({})", (Object)this.physicalSlotRequestId);
        Map<ExecutionVertexID, CompletableFuture> logicalSlotFutures = this.requestedLogicalSlots.keySetA().stream().collect(Collectors.toMap(executionVertexId -> executionVertexId, this.requestedLogicalSlots::getValueByKeyA));
        for (Map.Entry<ExecutionVertexID, CompletableFuture> entry : logicalSlotFutures.entrySet()) {
            LOG.debug("Release {}", (Object)this.getLogicalSlotString(entry.getKey()));
            CompletableFuture logicalSlotFuture = entry.getValue();
            Preconditions.checkNotNull((Object)logicalSlotFuture);
            Preconditions.checkState((boolean)logicalSlotFuture.isDone(), (String)"Logical slot future must already done when release call comes from the successfully allocated physical slot ({})", (Object[])new Object[]{this.physicalSlotRequestId});
            logicalSlotFuture.thenAccept(logicalSlot -> logicalSlot.release(cause));
        }
        this.requestedLogicalSlots.clear();
        this.releaseExternally();
    }

    private void releaseExternally() {
        if (this.state != State.RELEASED && this.requestedLogicalSlots.values().isEmpty()) {
            this.state = State.RELEASED;
            LOG.debug("Release shared slot externally ({})", (Object)this.physicalSlotRequestId);
            this.externalReleaseCallback.accept(this.executionSlotSharingGroup);
        }
    }

    @Override
    public boolean willOccupySlotIndefinitely() {
        return this.slotWillBeOccupiedIndefinitely;
    }

    private String getLogicalSlotString(SlotRequestId logicalSlotRequestId) {
        return this.getLogicalSlotString(logicalSlotRequestId, this.requestedLogicalSlots.getKeyAByKeyB(logicalSlotRequestId));
    }

    private String getLogicalSlotString(ExecutionVertexID executionVertexId) {
        return this.getLogicalSlotString(this.requestedLogicalSlots.getKeyBByKeyA(executionVertexId), executionVertexId);
    }

    private String getLogicalSlotString(SlotRequestId logicalSlotRequestId, ExecutionVertexID executionVertexId) {
        return String.format("logical slot (%s) for execution vertex (id %s) from the physical slot (%s)", new Object[]{logicalSlotRequestId, executionVertexId, this.physicalSlotRequestId});
    }

    boolean isEmpty() {
        return this.requestedLogicalSlots.size() == 0;
    }

    private static enum State {
        ALLOCATED,
        RELEASED;

    }
}

