package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot.class */
public class SingleLogicalSlot implements LogicalSlot, AllocatedSlot.Payload {
    private static final AtomicReferenceFieldUpdater<SingleLogicalSlot, LogicalSlot.Payload> PAYLOAD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(SingleLogicalSlot.class, LogicalSlot.Payload.class, "payload");
    private static final AtomicReferenceFieldUpdater<SingleLogicalSlot, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(SingleLogicalSlot.class, State.class, JobDetailsInfo.FIELD_NAME_JOB_STATUS);
    private final SlotRequestId slotRequestId;
    private final SlotContext slotContext;

    @Nullable
    private final SlotSharingGroupId slotSharingGroupId;
    private final Locality locality;
    private final SlotOwner slotOwner;
    private final CompletableFuture<Void> releaseFuture = new CompletableFuture<>();
    private volatile State state = State.ALIVE;
    private volatile LogicalSlot.Payload payload = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlot$State.class */
    public enum State {
        ALIVE,
        RELEASING,
        RELEASED
    }

    public SingleLogicalSlot(SlotRequestId slotRequestId, SlotContext slotContext, @Nullable SlotSharingGroupId slotSharingGroupId, Locality locality, SlotOwner slotOwner) {
        this.slotRequestId = (SlotRequestId) Preconditions.checkNotNull(slotRequestId);
        this.slotContext = (SlotContext) Preconditions.checkNotNull(slotContext);
        this.slotSharingGroupId = slotSharingGroupId;
        this.locality = (Locality) Preconditions.checkNotNull(locality);
        this.slotOwner = (SlotOwner) Preconditions.checkNotNull(slotOwner);
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    public TaskManagerLocation getTaskManagerLocation() {
        return this.slotContext.getTaskManagerLocation();
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    public TaskManagerGateway getTaskManagerGateway() {
        return this.slotContext.getTaskManagerGateway();
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    public Locality getLocality() {
        return this.locality;
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    public boolean isAlive() {
        return this.state == State.ALIVE;
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    public boolean tryAssignPayload(LogicalSlot.Payload payload) {
        return PAYLOAD_UPDATER.compareAndSet(this, null, payload);
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    @Nullable
    public LogicalSlot.Payload getPayload() {
        return this.payload;
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    public CompletableFuture<?> releaseSlot(@Nullable Throwable th) {
        if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
            returnSlotToOwner(signalPayloadRelease(th));
        }
        return this.releaseFuture;
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    public int getPhysicalSlotNumber() {
        return this.slotContext.getPhysicalSlotNumber();
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    public AllocationID getAllocationId() {
        return this.slotContext.getAllocationId();
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    public SlotRequestId getSlotRequestId() {
        return this.slotRequestId;
    }

    @Override // org.apache.flink.runtime.jobmaster.LogicalSlot
    @Nullable
    public SlotSharingGroupId getSlotSharingGroupId() {
        return this.slotSharingGroupId;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.Payload
    public void release(Throwable th) {
        if (STATE_UPDATER.compareAndSet(this, State.ALIVE, State.RELEASING)) {
            signalPayloadRelease(th);
        }
        markReleased();
        this.releaseFuture.complete(null);
    }

    private CompletableFuture<?> signalPayloadRelease(Throwable th) {
        tryAssignPayload(TERMINATED_PAYLOAD);
        this.payload.fail(th);
        return this.payload.getTerminalStateFuture();
    }

    private void returnSlotToOwner(CompletableFuture<?> completableFuture) {
        completableFuture.handle((obj, th) -> {
            return this.state == State.RELEASING ? this.slotOwner.returnAllocatedSlot(this) : CompletableFuture.completedFuture(true);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity()).whenComplete((obj2, th2) -> {
            markReleased();
            if (th2 != null) {
                this.releaseFuture.completeExceptionally(th2);
            } else {
                this.releaseFuture.complete(null);
            }
        });
    }

    private void markReleased() {
        this.state = State.RELEASED;
    }
}
