package org.apache.flink.streaming.api.operators.async;

import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
import org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue;
import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.class */
public class AsyncWaitOperator<IN, OUT> extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT>, OperatorActions {
    private static final long serialVersionUID = 1;
    private static final String STATE_NAME = "_async_wait_operator_state_";
    private final int capacity;
    private final AsyncDataStream.OutputMode outputMode;
    private final long timeout;
    protected transient Object checkpointingLock;
    private transient StreamElementSerializer<IN> inStreamElementSerializer;
    private transient ListState<StreamElement> recoveredStreamElements;
    private transient StreamElementQueue queue;
    private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
    private transient ExecutorService executor;
    private transient Emitter<OUT> emitter;
    private transient Thread emitterThread;
    static final /* synthetic */ boolean $assertionsDisabled;

    public AsyncWaitOperator(AsyncFunction<IN, OUT> asyncFunction, long j, int i, AsyncDataStream.OutputMode outputMode) {
        super(asyncFunction);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        Preconditions.checkArgument(i > 0, "The number of concurrent async operation should be greater than 0.");
        this.capacity = i;
        this.outputMode = (AsyncDataStream.OutputMode) Preconditions.checkNotNull(outputMode, "outputMode");
        this.timeout = j;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        super.setup(streamTask, streamConfig, output);
        this.checkpointingLock = getContainingTask().getCheckpointLock();
        this.inStreamElementSerializer = new StreamElementSerializer<>(getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader()));
        this.executor = Executors.newSingleThreadExecutor();
        switch (this.outputMode) {
            case ORDERED:
                this.queue = new OrderedStreamElementQueue(this.capacity, this.executor, this);
                return;
            case UNORDERED:
                this.queue = new UnorderedStreamElementQueue(this.capacity, this.executor, this);
                return;
            default:
                throw new IllegalStateException("Unknown async mode: " + this.outputMode + '.');
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        if (this.recoveredStreamElements != null) {
            for (StreamElement streamElement : this.recoveredStreamElements.get()) {
                if (streamElement.isRecord()) {
                    processElement(streamElement.asRecord());
                } else if (streamElement.isWatermark()) {
                    processWatermark(streamElement.asWatermark());
                } else {
                    if (!streamElement.isLatencyMarker()) {
                        throw new IllegalStateException("Unknown record type " + streamElement.getClass() + " encountered while opening the operator.");
                    }
                    processLatencyMarker(streamElement.asLatencyMarker());
                }
            }
            this.recoveredStreamElements = null;
        }
        this.emitter = new Emitter<>(this.checkpointingLock, this.output, this.queue, this);
        this.emitterThread = new Thread(this.emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
        this.emitterThread.setDaemon(true);
        this.emitterThread.start();
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        final StreamRecordQueueEntry streamRecordQueueEntry = new StreamRecordQueueEntry(streamRecord);
        if (this.timeout > 0) {
            getProcessingTimeService().registerTimer(this.timeout + getProcessingTimeService().getCurrentProcessingTime(), new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.1
                @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback
                public void onProcessingTime(long j) throws Exception {
                    streamRecordQueueEntry.collect(new TimeoutException("Async function call has timed out."));
                }
            });
        }
        addAsyncBufferEntry(streamRecordQueueEntry);
        ((AsyncFunction) this.userFunction).asyncInvoke(streamRecord.getValue(), streamRecordQueueEntry);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
        addAsyncBufferEntry(new WatermarkQueueEntry(watermark));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        ListState operatorState = getOperatorStateBackend().getOperatorState(new ListStateDescriptor(STATE_NAME, this.inStreamElementSerializer));
        operatorState.clear();
        try {
            Iterator<StreamElementQueueEntry<?>> it = this.queue.values().iterator();
            while (it.hasNext()) {
                operatorState.add(it.next().getStreamElement());
            }
            if (this.pendingStreamElementQueueEntry != null) {
                operatorState.add(this.pendingStreamElementQueueEntry.getStreamElement());
            }
        } catch (Exception e) {
            operatorState.clear();
            throw new Exception("Could not add stream element queue entries to operator state backend of operator " + getOperatorName() + '.', e);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.recoveredStreamElements = stateInitializationContext.getOperatorStateStore().getOperatorState(new ListStateDescriptor(STATE_NAME, this.inStreamElementSerializer));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        try {
            if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointingLock)) {
                throw new AssertionError();
            }
            while (!this.queue.isEmpty()) {
                this.checkpointingLock.wait();
            }
        } finally {
            Exception exc = null;
            try {
                super.close();
            } catch (InterruptedException e) {
                exc = e;
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                exc = e2;
            }
            try {
                stopResources(true);
            } catch (InterruptedException e3) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
                Thread.currentThread().interrupt();
            } catch (Exception e4) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e4, exc);
            }
            if (exc != null) {
                LOG.warn("Errors occurred while closing the AsyncWaitOperator.", exc);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() throws Exception {
        Exception exc = null;
        try {
            super.dispose();
        } catch (InterruptedException e) {
            exc = e;
            Thread.currentThread().interrupt();
        } catch (Exception e2) {
            exc = e2;
        }
        try {
            stopResources(false);
        } catch (InterruptedException e3) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
            Thread.currentThread().interrupt();
        } catch (Exception e4) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e4, exc);
        }
        if (exc != null) {
            throw exc;
        }
    }

    private void stopResources(boolean z) throws InterruptedException {
        this.emitter.stop();
        this.emitterThread.interrupt();
        this.executor.shutdown();
        if (!z) {
            this.executor.shutdownNow();
            return;
        }
        try {
            if (!this.executor.awaitTermination(365L, TimeUnit.DAYS)) {
                this.executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (Thread.holdsLock(this.checkpointingLock)) {
            while (this.emitterThread.isAlive()) {
                this.checkpointingLock.wait(100L);
            }
        }
        this.emitterThread.join();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointingLock)) {
            throw new AssertionError();
        }
        this.pendingStreamElementQueueEntry = streamElementQueueEntry;
        while (!this.queue.tryPut(streamElementQueueEntry)) {
            this.checkpointingLock.wait();
        }
        this.pendingStreamElementQueueEntry = null;
    }

    @Override // org.apache.flink.streaming.api.operators.async.OperatorActions
    public void failOperator(Throwable th) {
        getContainingTask().getEnvironment().failExternally(th);
    }

    static {
        $assertionsDisabled = !AsyncWaitOperator.class.desiredAssertionStatus();
    }
}
