package org.apache.flink.runtime.asyncprocessing;

import java.util.LinkedList;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/EpochManager.class */
public class EpochManager {
    private static final Logger LOG = LoggerFactory.getLogger(EpochManager.class);
    final AsyncExecutionController<?> asyncExecutionController;
    long epochNum;
    LinkedList<Epoch> outputQueue = new LinkedList<>();
    Epoch activeEpoch;

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/EpochManager$Epoch.class */
    public static class Epoch {
        long id;
        int ongoingRecordCount = 0;
        EpochStatus status = EpochStatus.OPEN;

        @Nullable
        Runnable action = null;

        public Epoch(long j) {
            this.id = j;
        }

        boolean tryFinish() {
            if (this.status == EpochStatus.FINISHED) {
                return true;
            }
            if (this.ongoingRecordCount != 0 || this.status != EpochStatus.CLOSED) {
                return false;
            }
            this.status = EpochStatus.FINISHED;
            if (this.action == null) {
                return true;
            }
            this.action.run();
            return true;
        }

        void close(Runnable runnable) {
            this.action = runnable;
            this.status = EpochStatus.CLOSED;
        }

        public String toString() {
            return String.format("Epoch{id=%d, ongoingRecord=%d, status=%s}", Long.valueOf(this.id), Integer.valueOf(this.ongoingRecordCount), this.status);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/EpochManager$EpochStatus.class */
    public enum EpochStatus {
        OPEN,
        CLOSED,
        FINISHED
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/EpochManager$ParallelMode.class */
    public enum ParallelMode {
        SERIAL_BETWEEN_EPOCH,
        PARALLEL_BETWEEN_EPOCH
    }

    public EpochManager(AsyncExecutionController<?> asyncExecutionController) {
        this.epochNum = 0L;
        this.asyncExecutionController = asyncExecutionController;
        long j = this.epochNum;
        this.epochNum = j + 1;
        this.activeEpoch = new Epoch(j);
    }

    public Epoch onRecord() {
        this.activeEpoch.ongoingRecordCount++;
        return this.activeEpoch;
    }

    public void onNonRecord(Runnable runnable, ParallelMode parallelMode) {
        LOG.trace("on NonRecord, old epoch: {}, outputQueue size: {}", this.activeEpoch, Integer.valueOf(this.outputQueue.size()));
        switchActiveEpoch(runnable);
        if (parallelMode == ParallelMode.SERIAL_BETWEEN_EPOCH) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
    }

    public void completeOneRecord(Epoch epoch) {
        int i = epoch.ongoingRecordCount - 1;
        epoch.ongoingRecordCount = i;
        if (i == 0) {
            tryFinishInQueue();
        }
    }

    private void tryFinishInQueue() {
        while (!this.outputQueue.isEmpty() && this.outputQueue.peek().tryFinish()) {
            LOG.trace("Finish epoch: {}, outputQueue size: {}", this.outputQueue.peek(), Integer.valueOf(this.outputQueue.size()));
            this.outputQueue.pop();
        }
    }

    private void switchActiveEpoch(Runnable runnable) {
        this.activeEpoch.close(runnable);
        this.outputQueue.offer(this.activeEpoch);
        long j = this.epochNum;
        this.epochNum = j + 1;
        this.activeEpoch = new Epoch(j);
        tryFinishInQueue();
    }
}
