package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.TimestampedInputSplit;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.class */
public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<T, OUT>, OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
    private transient InputFormat<OUT, ? super T> format;
    private TypeSerializer<OUT> serializer;
    private transient MailboxExecutorImpl executor;
    private transient OUT reusedRecord;
    private transient SourceFunction.SourceContext<OUT> sourceContext;
    private transient ListState<T> checkpointedState;
    private transient T currentSplit;
    private transient Counter completedSplitsCounter;
    private transient ReaderState state = ReaderState.IDLE;
    private transient PriorityQueue<T> splits = new PriorityQueue<>();
    private final transient RunnableWithException processRecordAction = () -> {
        try {
            processRecord();
        } catch (Exception e) {
            switchState(ReaderState.FAILED);
            throw e;
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator$ReaderState.class */
    public enum ReaderState {
        IDLE { // from class: org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState.1
            @Override // org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState
            public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, T> continuousFileReaderOperator) throws IOException {
                throw new IllegalStateException("not processing any records in IDLE state");
            }
        },
        OPENING { // from class: org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState.2
            @Override // org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState
            public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, T> continuousFileReaderOperator) throws IOException {
                if (((ContinuousFileReaderOperator) continuousFileReaderOperator).splits.isEmpty()) {
                    continuousFileReaderOperator.switchState(ReaderState.IDLE);
                    return false;
                }
                continuousFileReaderOperator.loadSplit((TimestampedInputSplit) ((ContinuousFileReaderOperator) continuousFileReaderOperator).splits.poll());
                continuousFileReaderOperator.switchState(ReaderState.READING);
                return true;
            }
        },
        READING { // from class: org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState.3
            @Override // org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState
            public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, T> continuousFileReaderOperator) throws IOException {
                return true;
            }

            @Override // org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState
            public void onNoMoreData(ContinuousFileReaderOperator<?, ?> continuousFileReaderOperator) {
                continuousFileReaderOperator.switchState(ReaderState.IDLE);
            }
        },
        FAILED { // from class: org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState.4
            @Override // org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState
            public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, T> continuousFileReaderOperator) throws IOException {
                throw new IllegalStateException("not processing any records in ERRORED state");
            }
        },
        FINISHING { // from class: org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState.5
            @Override // org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState
            public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, T> continuousFileReaderOperator) throws IOException {
                if (((ContinuousFileReaderOperator) continuousFileReaderOperator).currentSplit != null || ((ContinuousFileReaderOperator) continuousFileReaderOperator).splits.isEmpty()) {
                    return true;
                }
                continuousFileReaderOperator.loadSplit((TimestampedInputSplit) ((ContinuousFileReaderOperator) continuousFileReaderOperator).splits.poll());
                return true;
            }

            @Override // org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState
            public void onNoMoreData(ContinuousFileReaderOperator<?, ?> continuousFileReaderOperator) {
                continuousFileReaderOperator.enqueueProcessRecord();
                continuousFileReaderOperator.switchState(FINISHED);
            }
        },
        FINISHED { // from class: org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState.6
            @Override // org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.ReaderState
            public <T extends TimestampedInputSplit> boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, T> continuousFileReaderOperator) {
                ContinuousFileReaderOperator.LOG.warn("not processing any records while closed");
                return false;
            }
        };

        private static final Map<ReaderState, Set<ReaderState>> VALID_TRANSITIONS;
        private static final Set<ReaderState> ACCEPT_SPLITS = EnumSet.of(IDLE, OPENING, READING);

        public boolean isAcceptingSplits() {
            return ACCEPT_SPLITS.contains(this);
        }

        public final boolean isTerminal() {
            return this == FINISHED;
        }

        public boolean canSwitchTo(ReaderState readerState) {
            return VALID_TRANSITIONS.getOrDefault(this, EnumSet.noneOf(ReaderState.class)).contains(readerState);
        }

        public abstract <T extends TimestampedInputSplit> boolean prepareToProcessRecord(ContinuousFileReaderOperator<?, T> continuousFileReaderOperator) throws IOException;

        public void onNoMoreData(ContinuousFileReaderOperator<?, ?> continuousFileReaderOperator) {
        }

        static {
            HashMap hashMap = new HashMap();
            hashMap.put(IDLE, EnumSet.of(OPENING, FINISHED, FAILED));
            hashMap.put(OPENING, EnumSet.of(READING, FINISHING, FAILED));
            hashMap.put(READING, EnumSet.of(IDLE, OPENING, FINISHING, FAILED));
            hashMap.put(FINISHING, EnumSet.of(FINISHED, FAILED));
            hashMap.put(FAILED, EnumSet.of(FINISHED));
            hashMap.put(FINISHED, EnumSet.noneOf(ReaderState.class));
            VALID_TRANSITIONS = new EnumMap(hashMap);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ContinuousFileReaderOperator(InputFormat<OUT, ? super T> inputFormat, ProcessingTimeService processingTimeService, MailboxExecutor mailboxExecutor) {
        this.format = (InputFormat) Preconditions.checkNotNull(inputFormat);
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.executor = (MailboxExecutorImpl) Preconditions.checkNotNull(mailboxExecutor);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        Preconditions.checkState(this.checkpointedState == null, "The reader state has already been initialized.");
        this.checkpointedState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("splits", new JavaSerializer()));
        int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        if (!stateInitializationContext.isRestored()) {
            LOG.info("No state to restore for the {} (taskIdx={}).", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask));
            return;
        }
        LOG.info("Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask));
        this.splits = this.splits == null ? new PriorityQueue<>() : this.splits;
        Iterator it = this.checkpointedState.get().iterator();
        while (it.hasNext()) {
            this.splits.add((TimestampedInputSplit) it.next());
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} (taskIdx={}) restored {}.", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), this.splits});
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        Preconditions.checkState(this.serializer != null, "The serializer has not been set. Probably the setOutputType() was not called. Please report it.");
        this.state = ReaderState.IDLE;
        if (this.format instanceof RichInputFormat) {
            ((RichInputFormat) this.format).setRuntimeContext(getRuntimeContext());
        }
        this.format.configure(new Configuration());
        this.sourceContext = StreamSourceContexts.getSourceContext(getOperatorConfig().getTimeCharacteristic(), getProcessingTimeService(), new Object(), this.output, getExecutionConfig().getAutoWatermarkInterval(), -1L, true);
        this.reusedRecord = this.serializer.createInstance2();
        this.completedSplitsCounter = getMetricGroup().counter("numSplitsProcessed");
        this.splits = this.splits == null ? new PriorityQueue<>() : this.splits;
        if (this.splits.isEmpty()) {
            return;
        }
        enqueueProcessRecord();
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        Preconditions.checkState(this.state.isAcceptingSplits());
        this.splits.offer(streamRecord.getValue());
        if (this.state == ReaderState.IDLE) {
            enqueueProcessRecord();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void enqueueProcessRecord() {
        Preconditions.checkState(!this.state.isTerminal(), "can't enqueue mail in terminal state %s", this.state);
        this.executor.execute(this.processRecordAction, "ContinuousFileReaderOperator");
        if (this.state == ReaderState.IDLE) {
            switchState(ReaderState.OPENING);
        }
    }

    private void processRecord() throws IOException {
        while (this.state.prepareToProcessRecord(this)) {
            readAndCollectRecord();
            if (this.format.reachedEnd()) {
                onSplitProcessed();
                return;
            } else if (!this.executor.isIdle()) {
                enqueueProcessRecord();
                return;
            }
        }
    }

    private void onSplitProcessed() throws IOException {
        this.completedSplitsCounter.inc();
        LOG.debug("split {} processed: {}", Long.valueOf(this.completedSplitsCounter.getCount()), this.currentSplit);
        this.format.close();
        this.currentSplit = null;
        if (this.splits.isEmpty()) {
            this.state.onNoMoreData(this);
            return;
        }
        if (this.state == ReaderState.READING) {
            switchState(ReaderState.OPENING);
        }
        enqueueProcessRecord();
    }

    private void readAndCollectRecord() throws IOException {
        OUT nextRecord;
        Preconditions.checkState(this.state == ReaderState.READING || this.state == ReaderState.FINISHING, "can't process record in state %s", this.state);
        if (this.format.reachedEnd() || (nextRecord = this.format.nextRecord(this.reusedRecord)) == null) {
            return;
        }
        this.sourceContext.collect(nextRecord);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void loadSplit(T t) throws IOException {
        Preconditions.checkState((this.state == ReaderState.READING || this.state == ReaderState.FINISHED) ? false : true, "can't load split in state %s", this.state);
        Preconditions.checkNotNull(t, "split is null");
        LOG.debug("load split: {}", t);
        this.currentSplit = t;
        if (this.format instanceof RichInputFormat) {
            ((RichInputFormat) this.format).openInputFormat();
        }
        if (!(this.format instanceof CheckpointableInputFormat) || this.currentSplit.getSplitState() == null) {
            this.format.open(this.currentSplit);
        } else {
            ((CheckpointableInputFormat) this.format).reopen(this.currentSplit, this.currentSplit.getSplitState());
        }
        this.currentSplit.resetSplitState();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void switchState(ReaderState readerState) {
        if (this.state != readerState) {
            Preconditions.checkState(this.state.canSwitchTo(readerState), "can't switch state from terminal state %s to %s", this.state, readerState);
            LOG.debug("switch state: {} -> {}", this.state, readerState);
            this.state = readerState;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.Input
    public void processWatermark(Watermark watermark) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void finish() throws Exception {
        LOG.debug("finishing");
        super.finish();
        switch (this.state) {
            case IDLE:
                switchState(ReaderState.FINISHED);
                break;
            case FINISHED:
                LOG.warn("operator is already closed, doing nothing");
                return;
            default:
                switchState(ReaderState.FINISHING);
                while (!this.state.isTerminal()) {
                    this.executor.yield();
                }
        }
        try {
            this.sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
        } catch (Exception e) {
            LOG.warn("unable to emit watermark while closing", e);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        Exception exc = null;
        try {
            cleanUp();
        } catch (Exception e) {
            exc = e;
        }
        this.checkpointedState = null;
        this.completedSplitsCounter = null;
        this.currentSplit = null;
        this.executor = null;
        this.format = null;
        this.sourceContext = null;
        this.reusedRecord = null;
        this.serializer = null;
        this.splits = null;
        try {
            super.close();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        if (exc != null) {
            throw exc;
        }
    }

    private void cleanUp() throws Exception {
        LOG.debug("cleanup, state={}", this.state);
        Exception exc = null;
        for (RunnableWithException runnableWithException : new RunnableWithException[]{() -> {
            this.sourceContext.close();
        }, () -> {
            this.format.close();
        }, () -> {
            if (this.format instanceof RichInputFormat) {
                ((RichInputFormat) this.format).closeInputFormat();
            }
        }}) {
            try {
                runnableWithException.run();
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
            }
        }
        this.currentSplit = null;
        if (exc != null) {
            throw exc;
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        Preconditions.checkState(this.checkpointedState != null, "The operator state has not been properly initialized.");
        int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        List<T> readerState = getReaderState();
        try {
            this.checkpointedState.update(readerState);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} (taskIdx={}) checkpointed {} splits: {}.", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), Integer.valueOf(readerState.size()), readerState});
            }
        } catch (Exception e) {
            this.checkpointedState.clear();
            throw new Exception("Could not add timestamped file input splits to operator state backend of operator " + getOperatorName() + '.', e);
        }
    }

    private List<T> getReaderState() throws IOException {
        ArrayList arrayList = new ArrayList(this.splits.size());
        if (this.currentSplit != null) {
            if ((this.format instanceof CheckpointableInputFormat) && this.state == ReaderState.READING) {
                this.currentSplit.setSplitState(((CheckpointableInputFormat) this.format).getCurrentState());
            }
            arrayList.add(this.currentSplit);
        }
        arrayList.addAll(this.splits);
        return arrayList;
    }

    @Override // org.apache.flink.streaming.api.operators.OutputTypeConfigurable
    public void setOutputType(TypeInformation<OUT> typeInformation, ExecutionConfig executionConfig) {
        this.serializer = typeInformation.createSerializer(executionConfig.getSerializerConfig());
    }
}
