package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskExternallyInducedSourceInput;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.class */
public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, ?>> {
    private AsyncDataOutputToOutput<T> output;
    private boolean isExternallyInducedSource;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask$AsyncDataOutputToOutput.class */
    public static class AsyncDataOutputToOutput<T> extends AbstractDataOutput<T> {
        private final Output<StreamRecord<T>> output;
        private final Counter numRecordsOut;

        @Nullable
        private final WatermarkGauge inputWatermarkGauge;

        public AsyncDataOutputToOutput(Output<StreamRecord<T>> output, StreamStatusMaintainer streamStatusMaintainer, Counter counter, @Nullable WatermarkGauge watermarkGauge) {
            super(streamStatusMaintainer);
            this.output = (Output) Preconditions.checkNotNull(output);
            this.numRecordsOut = counter;
            this.inputWatermarkGauge = watermarkGauge;
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitRecord(StreamRecord<T> streamRecord) {
            this.numRecordsOut.inc();
            this.output.collect(streamRecord);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            this.output.emitLatencyMarker(latencyMarker);
        }

        @Override // org.apache.flink.streaming.runtime.io.PushingAsyncDataInput.DataOutput
        public void emitWatermark(Watermark watermark) {
            if (this.inputWatermarkGauge != null) {
                this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            }
            this.output.emitWatermark(watermark);
        }
    }

    public SourceOperatorStreamTask(Environment environment) throws Exception {
        super(environment);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public CompletableFuture<Void> getCompletionFuture() {
        return super.getCompletionFuture();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void init() throws Exception {
        StreamTaskSourceInput streamTaskSourceInput;
        SourceOperator sourceOperator = (SourceOperator) this.mainOperator;
        sourceOperator.initReader();
        if (sourceOperator.getSourceReader() instanceof ExternallyInducedSourceReader) {
            this.isExternallyInducedSource = true;
            streamTaskSourceInput = new StreamTaskExternallyInducedSourceInput(sourceOperator, (v1) -> {
                triggerCheckpointForExternallyInducedSource(v1);
            }, 0, 0);
        } else {
            streamTaskSourceInput = new StreamTaskSourceInput(sourceOperator, 0, 0);
        }
        this.output = new AsyncDataOutputToOutput<>(this.operatorChain.getMainOperatorOutput(), getStreamStatusMaintainer(), ((OperatorMetricGroup) sourceOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter(), null);
        this.inputProcessor = new StreamOneInputProcessor(streamTaskSourceInput, this.output, this.operatorChain);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void finishTask() throws Exception {
        this.mailboxProcessor.allActionsCompleted();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask, org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        return !this.isExternallyInducedSource ? super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions) : CompletableFuture.completedFuture(Boolean.valueOf(isRunning()));
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void advanceToEndOfEventTime() {
        this.output.emitWatermark(Watermark.MAX_WATERMARK);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void afterInvoke() throws Exception {
        if (!isCanceled()) {
            advanceToEndOfEventTime();
        }
        super.afterInvoke();
    }

    private void triggerCheckpointForExternallyInducedSource(long j) {
        super.triggerCheckpointAsync(new CheckpointMetaData(j, System.currentTimeMillis()), CheckpointOptions.forConfig(CheckpointType.CHECKPOINT, CheckpointStorageLocationReference.getDefault(), this.configuration.isExactlyOnceCheckpointMode(), this.configuration.isUnalignedCheckpointsEnabled(), this.configuration.getAlignmentTimeout().toMillis()));
    }
}
