package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.class */
final class ChannelStateWriteRequestDispatcherImpl implements ChannelStateWriteRequestDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequestDispatcherImpl.class);
    private final SupplierWithException<CheckpointStorageWorkerView, ? extends IOException> checkpointStorageWorkerViewSupplier;
    private CheckpointStorageWorkerView streamFactoryResolver;
    private final ChannelStateSerializer serializer;
    private final Set<SubtaskID> registeredSubtasks = new HashSet();
    private long ongoingCheckpointId = -1;
    private long maxAbortedCheckpointId = -1;
    private SubtaskID abortedSubtaskID;
    private Throwable abortedCause;
    private ChannelStateCheckpointWriter writer;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStateWriteRequestDispatcherImpl(SupplierWithException<CheckpointStorageWorkerView, ? extends IOException> supplierWithException, ChannelStateSerializer channelStateSerializer) {
        this.checkpointStorageWorkerViewSupplier = supplierWithException;
        this.serializer = (ChannelStateSerializer) Preconditions.checkNotNull(channelStateSerializer);
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher
    public void dispatch(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        LOG.trace("process {}", channelStateWriteRequest);
        try {
            dispatchInternal(channelStateWriteRequest);
        } catch (Exception e) {
            try {
                channelStateWriteRequest.cancel(e);
            } catch (Exception e2) {
                e.addSuppressed(e2);
            }
            throw e;
        }
    }

    private void dispatchInternal(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        if (channelStateWriteRequest instanceof SubtaskRegisterRequest) {
            SubtaskRegisterRequest subtaskRegisterRequest = (SubtaskRegisterRequest) channelStateWriteRequest;
            this.registeredSubtasks.add(SubtaskID.of(subtaskRegisterRequest.getJobVertexID(), subtaskRegisterRequest.getSubtaskIndex()));
            return;
        }
        if (channelStateWriteRequest instanceof SubtaskReleaseRequest) {
            SubtaskReleaseRequest subtaskReleaseRequest = (SubtaskReleaseRequest) channelStateWriteRequest;
            SubtaskID of = SubtaskID.of(subtaskReleaseRequest.getJobVertexID(), subtaskReleaseRequest.getSubtaskIndex());
            this.registeredSubtasks.remove(of);
            if (this.writer == null) {
                return;
            }
            this.writer.releaseSubtask(of);
            return;
        }
        if (isAbortedCheckpoint(channelStateWriteRequest.getCheckpointId())) {
            handleAbortedRequest(channelStateWriteRequest);
            return;
        }
        if (channelStateWriteRequest instanceof CheckpointStartRequest) {
            handleCheckpointStartRequest(channelStateWriteRequest);
        } else if (channelStateWriteRequest instanceof CheckpointInProgressRequest) {
            handleCheckpointInProgressRequest((CheckpointInProgressRequest) channelStateWriteRequest);
        } else {
            if (!(channelStateWriteRequest instanceof CheckpointAbortRequest)) {
                throw new IllegalArgumentException("unknown request type: " + channelStateWriteRequest);
            }
            handleCheckpointAbortRequest(channelStateWriteRequest);
        }
    }

    private void handleAbortedRequest(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        if (channelStateWriteRequest.getCheckpointId() != this.maxAbortedCheckpointId) {
            channelStateWriteRequest.cancel(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
        } else if (SubtaskID.of(channelStateWriteRequest.getJobVertexID(), channelStateWriteRequest.getSubtaskIndex()).equals(this.abortedSubtaskID)) {
            channelStateWriteRequest.cancel(this.abortedCause);
        } else {
            channelStateWriteRequest.cancel(new CheckpointException(CheckpointFailureReason.CHANNEL_STATE_SHARED_STREAM_EXCEPTION, this.abortedCause));
        }
    }

    private void handleCheckpointStartRequest(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        Preconditions.checkState(channelStateWriteRequest.getCheckpointId() >= this.ongoingCheckpointId, String.format("Checkpoint must be incremented, ongoingCheckpointId is %s, but the request is %s.", Long.valueOf(this.ongoingCheckpointId), channelStateWriteRequest));
        if (channelStateWriteRequest.getCheckpointId() > this.ongoingCheckpointId) {
            failAndClearWriter(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
        }
        CheckpointStartRequest checkpointStartRequest = (CheckpointStartRequest) channelStateWriteRequest;
        if (this.writer == null) {
            this.writer = buildWriter(checkpointStartRequest);
            this.ongoingCheckpointId = channelStateWriteRequest.getCheckpointId();
        }
        this.writer.registerSubtaskResult(SubtaskID.of(checkpointStartRequest.getJobVertexID(), checkpointStartRequest.getSubtaskIndex()), checkpointStartRequest.getTargetResult());
    }

    private void handleCheckpointInProgressRequest(CheckpointInProgressRequest checkpointInProgressRequest) throws Exception {
        Preconditions.checkArgument(this.ongoingCheckpointId == checkpointInProgressRequest.getCheckpointId() && this.writer != null, "writer not found while processing request: " + checkpointInProgressRequest);
        checkpointInProgressRequest.execute(this.writer);
    }

    private void handleCheckpointAbortRequest(ChannelStateWriteRequest channelStateWriteRequest) {
        CheckpointAbortRequest checkpointAbortRequest = (CheckpointAbortRequest) channelStateWriteRequest;
        if (channelStateWriteRequest.getCheckpointId() > this.maxAbortedCheckpointId) {
            this.maxAbortedCheckpointId = checkpointAbortRequest.getCheckpointId();
            this.abortedCause = checkpointAbortRequest.getThrowable();
            this.abortedSubtaskID = SubtaskID.of(checkpointAbortRequest.getJobVertexID(), checkpointAbortRequest.getSubtaskIndex());
        }
        if (checkpointAbortRequest.getCheckpointId() == this.ongoingCheckpointId) {
            failAndClearWriter(checkpointAbortRequest.getJobVertexID(), checkpointAbortRequest.getSubtaskIndex(), checkpointAbortRequest.getThrowable());
        } else if (channelStateWriteRequest.getCheckpointId() > this.ongoingCheckpointId) {
            failAndClearWriter(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
        }
    }

    private boolean isAbortedCheckpoint(long j) {
        return j < this.ongoingCheckpointId || j <= this.maxAbortedCheckpointId;
    }

    private void failAndClearWriter(Throwable th) {
        if (this.writer == null) {
            return;
        }
        this.writer.fail(th);
        this.writer = null;
    }

    private void failAndClearWriter(JobVertexID jobVertexID, int i, Throwable th) {
        if (this.writer == null) {
            return;
        }
        this.writer.fail(jobVertexID, i, th);
        this.writer = null;
    }

    private ChannelStateCheckpointWriter buildWriter(CheckpointStartRequest checkpointStartRequest) throws Exception {
        return new ChannelStateCheckpointWriter(this.registeredSubtasks, checkpointStartRequest.getCheckpointId(), getStreamFactoryResolver().resolveCheckpointStorageLocation(checkpointStartRequest.getCheckpointId(), checkpointStartRequest.getLocationReference()), this.serializer, () -> {
            Preconditions.checkState(checkpointStartRequest.getCheckpointId() == this.ongoingCheckpointId, "The ongoingCheckpointId[%s] was changed when clear writer of checkpoint[%s], it might be a bug.", Long.valueOf(this.ongoingCheckpointId), Long.valueOf(checkpointStartRequest.getCheckpointId()));
            this.writer = null;
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcher
    public void fail(Throwable th) {
        if (this.writer == null) {
            return;
        }
        try {
            this.writer.fail(th);
        } catch (Exception e) {
            LOG.warn("unable to fail write channel state writer", th);
        }
        this.writer = null;
    }

    CheckpointStorageWorkerView getStreamFactoryResolver() throws IOException {
        if (this.streamFactoryResolver == null) {
            this.streamFactoryResolver = this.checkpointStorageWorkerViewSupplier.get();
        }
        return this.streamFactoryResolver;
    }
}
