package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.operators.coordination.AcknowledgeCheckpointEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain.class */
public abstract class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements BoundedMultiInput, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
    protected final RecordWriterOutput<?>[] streamOutputs;
    protected final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput;

    @Nullable
    protected final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;

    @Nullable
    protected final StreamOperatorWrapper<?, ?> firstOperatorWrapper;

    @Nullable
    protected final StreamOperatorWrapper<?, ?> tailOperatorWrapper;
    protected final Map<StreamConfig.SourceInputConfig, ChainedSource> chainedSources;
    protected final int numOperators;
    protected final OperatorEventDispatcherImpl operatorEventDispatcher;
    protected final Closer closer;

    @Nullable
    protected final FinishedOnRestoreInput finishedOnRestoreInput;
    protected boolean isClosed;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$ChainedSource.class */
    public static class ChainedSource {
        private final WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput;
        private final StreamTaskSourceInput<?> sourceTaskInput;

        public ChainedSource(WatermarkGaugeExposingOutput<StreamRecord<?>> watermarkGaugeExposingOutput, StreamTaskSourceInput<?> streamTaskSourceInput) {
            this.chainedSourceOutput = watermarkGaugeExposingOutput;
            this.sourceTaskInput = streamTaskSourceInput;
        }

        public WatermarkGaugeExposingOutput<StreamRecord<?>> getSourceOutput() {
            return this.chainedSourceOutput;
        }

        public StreamTaskSourceInput<?> getSourceTaskInput() {
            return this.sourceTaskInput;
        }
    }

    /* JADX WARN: Finally extract failed */
    /* JADX WARN: Multi-variable type inference failed */
    public OperatorChain(StreamTask<OUT, OP> streamTask, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
        this.closer = Closer.create();
        this.operatorEventDispatcher = new OperatorEventDispatcherImpl(streamTask.getEnvironment().getUserCodeClassLoader().asClassLoader(), streamTask.getEnvironment().getOperatorCoordinatorEventGateway());
        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
        StreamConfig configuration = streamTask.getConfiguration();
        StreamOperatorFactory streamOperatorFactory = configuration.getStreamOperatorFactory(userCodeClassLoader);
        Map<Integer, StreamConfig> transitiveChainedTaskConfigsWithSelf = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassLoader);
        List<NonChainedOutput> vertexNonChainedOutputs = configuration.getVertexNonChainedOutputs(userCodeClassLoader);
        Map<IntermediateDataSetID, RecordWriterOutput<?>> hashMap = new HashMap<>(vertexNonChainedOutputs.size());
        this.streamOutputs = new RecordWriterOutput[vertexNonChainedOutputs.size()];
        this.finishedOnRestoreInput = isTaskDeployedAsFinished() ? new FinishedOnRestoreInput(this.streamOutputs, configuration.getInputs(userCodeClassLoader).length) : null;
        boolean z = false;
        try {
            createChainOutputs(vertexNonChainedOutputs, recordWriterDelegate, transitiveChainedTaskConfigsWithSelf, streamTask, hashMap);
            List<StreamOperatorWrapper<?, ?>> arrayList = new ArrayList<>(transitiveChainedTaskConfigsWithSelf.size());
            this.mainOperatorOutput = createOutputCollector(streamTask, configuration, transitiveChainedTaskConfigsWithSelf, userCodeClassLoader, hashMap, arrayList, streamTask.getMailboxExecutorFactory());
            if (streamOperatorFactory != null) {
                Tuple2 createOperator = StreamOperatorFactoryUtil.createOperator(streamOperatorFactory, streamTask, configuration, this.mainOperatorOutput, this.operatorEventDispatcher);
                StreamOperator streamOperator = (StreamOperator) createOperator.f0;
                streamOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, (String) this.mainOperatorOutput.getWatermarkGauge());
                this.mainOperatorWrapper = createOperatorWrapper(streamOperator, streamTask, configuration, (Optional) createOperator.f1, true);
                arrayList.add(this.mainOperatorWrapper);
                this.tailOperatorWrapper = arrayList.get(0);
            } else {
                Preconditions.checkState(arrayList.size() == 0);
                this.mainOperatorWrapper = null;
                this.tailOperatorWrapper = null;
            }
            this.chainedSources = createChainedSources(streamTask, configuration.getInputs(userCodeClassLoader), transitiveChainedTaskConfigsWithSelf, userCodeClassLoader, arrayList);
            this.numOperators = arrayList.size();
            this.firstOperatorWrapper = linkOperatorWrappers(arrayList);
            z = true;
            if (1 == 0) {
                for (int i = 0; i < this.streamOutputs.length; i++) {
                    if (this.streamOutputs[i] != null) {
                        this.streamOutputs[i].close();
                    }
                    this.streamOutputs[i] = null;
                }
            }
        } catch (Throwable th) {
            if (!z) {
                for (int i2 = 0; i2 < this.streamOutputs.length; i2++) {
                    if (this.streamOutputs[i2] != null) {
                        this.streamOutputs[i2].close();
                    }
                    this.streamOutputs[i2] = null;
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public OperatorChain(List<StreamOperatorWrapper<?, ?>> list, RecordWriterOutput<?>[] recordWriterOutputArr, WatermarkGaugeExposingOutput<StreamRecord<OUT>> watermarkGaugeExposingOutput, StreamOperatorWrapper<OUT, OP> streamOperatorWrapper) {
        this.closer = Closer.create();
        this.streamOutputs = recordWriterOutputArr;
        this.finishedOnRestoreInput = null;
        this.mainOperatorOutput = (WatermarkGaugeExposingOutput) Preconditions.checkNotNull(watermarkGaugeExposingOutput);
        this.operatorEventDispatcher = null;
        Preconditions.checkState(list != null && list.size() > 0);
        this.mainOperatorWrapper = (StreamOperatorWrapper) Preconditions.checkNotNull(streamOperatorWrapper);
        this.tailOperatorWrapper = list.get(0);
        this.numOperators = list.size();
        this.chainedSources = Collections.emptyMap();
        this.firstOperatorWrapper = linkOperatorWrappers(list);
    }

    public abstract boolean isTaskDeployedAsFinished();

    public abstract void dispatchOperatorEvent(OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) throws FlinkException;

    public abstract void prepareSnapshotPreBarrier(long j) throws Exception;

    @Override // org.apache.flink.streaming.api.operators.BoundedMultiInput
    public abstract void endInput(int i) throws Exception;

    public abstract void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception;

    public abstract void finishOperators(StreamTaskActionExecutor streamTaskActionExecutor, StopMode stopMode) throws Exception;

    public abstract void notifyCheckpointComplete(long j) throws Exception;

    public abstract void notifyCheckpointAborted(long j) throws Exception;

    public abstract void notifyCheckpointSubsumed(long j) throws Exception;

    public abstract void snapshotState(Map<OperatorID, OperatorSnapshotFutures> map, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, Supplier<Boolean> supplier, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, CheckpointStreamFactory checkpointStreamFactory) throws Exception;

    public OperatorEventDispatcher getOperatorEventDispatcher() {
        return this.operatorEventDispatcher;
    }

    public void broadcastEvent(AbstractEvent abstractEvent) throws IOException {
        broadcastEvent(abstractEvent, false);
    }

    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.broadcastEvent(abstractEvent, z);
        }
    }

    public void alignedBarrierTimeout(long j) throws IOException {
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.alignedBarrierTimeout(j);
        }
    }

    public void abortCheckpoint(long j, CheckpointException checkpointException) {
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.abortCheckpoint(j, checkpointException);
        }
    }

    public void closeAllOperators() throws Exception {
        this.isClosed = true;
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.streamOutputs;
    }

    @VisibleForTesting
    public Iterable<StreamOperatorWrapper<?, ?>> getAllOperators() {
        return getAllOperators(false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Iterable<StreamOperatorWrapper<?, ?>> getAllOperators(boolean z) {
        return z ? new StreamOperatorWrapper.ReadIterator(this.tailOperatorWrapper, true) : new StreamOperatorWrapper.ReadIterator(this.mainOperatorWrapper, false);
    }

    public Input getFinishedOnRestoreInputOrDefault(Input input) {
        return this.finishedOnRestoreInput == null ? input : this.finishedOnRestoreInput;
    }

    public int getNumberOfOperators() {
        return this.numOperators;
    }

    public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getMainOperatorOutput() {
        return this.mainOperatorOutput;
    }

    public ChainedSource getChainedSource(StreamConfig.SourceInputConfig sourceInputConfig) {
        Preconditions.checkArgument(this.chainedSources.containsKey(sourceInputConfig), "Chained source with sourcedId = [%s] was not found", sourceInputConfig);
        return this.chainedSources.get(sourceInputConfig);
    }

    public List<Output<StreamRecord<?>>> getChainedSourceOutputs() {
        return (List) this.chainedSources.values().stream().map((v0) -> {
            return v0.getSourceOutput();
        }).collect(Collectors.toList());
    }

    public StreamTaskSourceInput<?> getSourceTaskInput(StreamConfig.SourceInputConfig sourceInputConfig) {
        Preconditions.checkArgument(this.chainedSources.containsKey(sourceInputConfig), "Chained source with sourcedId = [%s] was not found", sourceInputConfig);
        return this.chainedSources.get(sourceInputConfig).getSourceTaskInput();
    }

    public List<StreamTaskSourceInput<?>> getSourceTaskInputs() {
        return (List) this.chainedSources.values().stream().map((v0) -> {
            return v0.getSourceTaskInput();
        }).collect(Collectors.toList());
    }

    public void flushOutputs() throws IOException {
        for (RecordWriterOutput<?> recordWriterOutput : getStreamOutputs()) {
            recordWriterOutput.flush();
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closer.close();
    }

    @Nullable
    public OP getMainOperator() {
        if (this.mainOperatorWrapper == null) {
            return null;
        }
        return this.mainOperatorWrapper.getStreamOperator();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Type inference failed for: r0v4, types: [org.apache.flink.streaming.api.operators.StreamOperator, org.apache.flink.streaming.api.operators.StreamOperator<?>] */
    @Nullable
    public StreamOperator<?> getTailOperator() {
        if (this.tailOperatorWrapper == null) {
            return null;
        }
        return this.tailOperatorWrapper.getStreamOperator();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void snapshotChannelStates(StreamOperator<?> streamOperator, ChannelStateWriter.ChannelStateWriteResult channelStateWriteResult, OperatorSnapshotFutures operatorSnapshotFutures) {
        if (streamOperator == getMainOperator()) {
            operatorSnapshotFutures.setInputChannelStateFuture(channelStateWriteResult.getInputChannelStateHandles().thenApply(StateObjectCollection::new).thenApply((Function<? super U, ? extends U>) (v0) -> {
                return SnapshotResult.of(v0);
            }));
        }
        if (streamOperator == getTailOperator()) {
            operatorSnapshotFutures.setResultSubpartitionStateFuture(channelStateWriteResult.getResultSubpartitionStateHandles().thenApply(StateObjectCollection::new).thenApply((Function<? super U, ? extends U>) (v0) -> {
                return SnapshotResult.of(v0);
            }));
        }
    }

    public boolean isClosed() {
        return this.isClosed;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void createChainOutputs(List<NonChainedOutput> list, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate, Map<Integer, StreamConfig> map, StreamTask<OUT, OP> streamTask, Map<IntermediateDataSetID, RecordWriterOutput<?>> map2) {
        for (int i = 0; i < list.size(); i++) {
            NonChainedOutput nonChainedOutput = list.get(i);
            RecordWriterOutput<OUT> createStreamOutput = createStreamOutput(recordWriterDelegate.getRecordWriter(i), nonChainedOutput, map.get(Integer.valueOf(nonChainedOutput.getSourceNodeId())), streamTask.getEnvironment());
            this.streamOutputs[i] = createStreamOutput;
            map2.put(nonChainedOutput.getDataSetId(), createStreamOutput);
        }
    }

    private RecordWriterOutput<OUT> createStreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter, NonChainedOutput nonChainedOutput, StreamConfig streamConfig, Environment environment) {
        return (RecordWriterOutput) this.closer.register(new RecordWriterOutput(recordWriter, nonChainedOutput.getOutputTag() != null ? streamConfig.getTypeSerializerSideOut(nonChainedOutput.getOutputTag(), environment.getUserCodeClassLoader().asClassLoader()) : streamConfig.getTypeSerializerOut(environment.getUserCodeClassLoader().asClassLoader()), nonChainedOutput.getOutputTag(), nonChainedOutput.supportsUnalignedCheckpoints()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Map<StreamConfig.SourceInputConfig, ChainedSource> createChainedSources(StreamTask<OUT, OP> streamTask, StreamConfig.InputConfig[] inputConfigArr, Map<Integer, StreamConfig> map, ClassLoader classLoader, List<StreamOperatorWrapper<?, ?>> list) {
        StreamTaskSourceInput streamTaskSourceInput;
        if (Arrays.stream(inputConfigArr).noneMatch(inputConfig -> {
            return inputConfig instanceof StreamConfig.SourceInputConfig;
        })) {
            return Collections.emptyMap();
        }
        Preconditions.checkState(this.mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator, "Creating chained input is only supported with MultipleInputStreamOperator and MultipleInputStreamTask");
        HashMap hashMap = new HashMap();
        MultipleInputStreamOperator multipleInputStreamOperator = (MultipleInputStreamOperator) this.mainOperatorWrapper.getStreamOperator();
        List<Input> inputs = multipleInputStreamOperator.getInputs();
        int orElse = Arrays.stream(streamTask.getEnvironment().getAllInputGates()).mapToInt((v0) -> {
            return v0.getInputGateIndex();
        }).max().orElse(-1) + 1;
        for (int i = 0; i < inputConfigArr.length; i++) {
            if (inputConfigArr[i] instanceof StreamConfig.SourceInputConfig) {
                StreamConfig.SourceInputConfig sourceInputConfig = (StreamConfig.SourceInputConfig) inputConfigArr[i];
                StreamConfig streamConfig = map.get(Integer.valueOf(sourceInputConfig.getInputEdge().getSourceId()));
                WatermarkGaugeExposingOutput<StreamRecord<OUT>> createChainedSourceOutput = createChainedSourceOutput(streamTask, streamConfig, classLoader, getFinishedOnRestoreInputOrDefault(inputs.get(i)), multipleInputStreamOperator.getMetricGroup(), sourceInputConfig.getInputEdge().getOutputTag());
                SourceOperator sourceOperator = (SourceOperator) createOperator(streamTask, streamConfig, classLoader, createChainedSourceOutput, list, true);
                if (isTaskDeployedAsFinished()) {
                    int i2 = orElse;
                    orElse++;
                    streamTaskSourceInput = new StreamTaskFinishedOnRestoreSourceInput(sourceOperator, i2, i);
                } else {
                    int i3 = orElse;
                    orElse++;
                    streamTaskSourceInput = new StreamTaskSourceInput(sourceOperator, i3, i);
                }
                hashMap.put(sourceInputConfig, new ChainedSource(createChainedSourceOutput, streamTaskSourceInput));
            }
        }
        return hashMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.flink.streaming.runtime.tasks.ChainingOutput] */
    private WatermarkGaugeExposingOutput<StreamRecord> createChainedSourceOutput(StreamTask<?, OP> streamTask, StreamConfig streamConfig, ClassLoader classLoader, Input input, OperatorMetricGroup operatorMetricGroup, OutputTag outputTag) {
        return (WatermarkGaugeExposingOutput) this.closer.register(streamTask.getExecutionConfig().isObjectReuseEnabled() ? new ChainingOutput(input, operatorMetricGroup, outputTag) : new CopyingChainingOutput(input, streamConfig.getTypeSerializerOut(classLoader), operatorMetricGroup, outputTag));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Map<Integer, StreamConfig> map, ClassLoader classLoader, Map<IntermediateDataSetID, RecordWriterOutput<?>> map2, List<StreamOperatorWrapper<?, ?>> list, MailboxExecutorFactory mailboxExecutorFactory) {
        ArrayList arrayList = new ArrayList(4);
        Iterator<NonChainedOutput> it = streamConfig.getOperatorNonChainedOutputs(classLoader).iterator();
        while (it.hasNext()) {
            arrayList.add(map2.get(it.next().getDataSetId()));
        }
        for (StreamEdge streamEdge : streamConfig.getChainedOutputs(classLoader)) {
            arrayList.add(createOperatorChain(streamTask, map.get(Integer.valueOf(streamEdge.getTargetId())), map, classLoader, map2, list, streamEdge.getOutputTag(), mailboxExecutorFactory));
        }
        if (arrayList.size() == 1) {
            return (WatermarkGaugeExposingOutput) arrayList.get(0);
        }
        Output[] outputArr = new Output[arrayList.size()];
        for (int i = 0; i < arrayList.size(); i++) {
            outputArr[i] = (Output) arrayList.get(i);
        }
        return streamTask.getExecutionConfig().isObjectReuseEnabled() ? (WatermarkGaugeExposingOutput) this.closer.register(new CopyingBroadcastingOutputCollector(outputArr)) : (WatermarkGaugeExposingOutput) this.closer.register(new BroadcastingOutputCollector(outputArr));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(StreamTask<OUT, ?> streamTask, StreamConfig streamConfig, Map<Integer, StreamConfig> map, ClassLoader classLoader, Map<IntermediateDataSetID, RecordWriterOutput<?>> map2, List<StreamOperatorWrapper<?, ?>> list, OutputTag<IN> outputTag, MailboxExecutorFactory mailboxExecutorFactory) {
        return wrapOperatorIntoOutput((OneInputStreamOperator) createOperator(streamTask, streamConfig, classLoader, createOutputCollector(streamTask, streamConfig, map, classLoader, map2, list, mailboxExecutorFactory), list, false), streamTask, streamConfig, classLoader, outputTag);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <OUT, OP extends StreamOperator<OUT>> OP createOperator(StreamTask<OUT, ?> streamTask, StreamConfig streamConfig, ClassLoader classLoader, WatermarkGaugeExposingOutput<StreamRecord<OUT>> watermarkGaugeExposingOutput, List<StreamOperatorWrapper<?, ?>> list, boolean z) {
        Tuple2 createOperator = StreamOperatorFactoryUtil.createOperator(streamConfig.getStreamOperatorFactory(classLoader), streamTask, streamConfig, watermarkGaugeExposingOutput, this.operatorEventDispatcher);
        OP op = (OP) createOperator.f0;
        list.add(createOperatorWrapper(op, streamTask, streamConfig, (Optional) createOperator.f1, z));
        OperatorMetricGroup metricGroup = op.getMetricGroup();
        Gauge<Long> watermarkGauge = watermarkGaugeExposingOutput.getWatermarkGauge();
        watermarkGauge.getClass();
        metricGroup.gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, (String) watermarkGauge::getValue);
        return op;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v13, types: [org.apache.flink.streaming.runtime.tasks.ChainingOutput] */
    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> wrapOperatorIntoOutput(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, StreamTask<OUT, ?> streamTask, StreamConfig streamConfig, ClassLoader classLoader, OutputTag<IN> outputTag) {
        CopyingChainingOutput chainingOutput = streamTask.getExecutionConfig().isObjectReuseEnabled() ? new ChainingOutput(oneInputStreamOperator, outputTag) : new CopyingChainingOutput(oneInputStreamOperator, streamConfig.getTypeSerializerIn1(classLoader), outputTag);
        OperatorMetricGroup metricGroup = oneInputStreamOperator.getMetricGroup();
        Gauge<Long> watermarkGauge = chainingOutput.getWatermarkGauge();
        watermarkGauge.getClass();
        metricGroup.gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, (String) watermarkGauge::getValue);
        return (WatermarkGaugeExposingOutput) this.closer.register(chainingOutput);
    }

    private StreamOperatorWrapper<?, ?> linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> list) {
        StreamOperatorWrapper<?, ?> streamOperatorWrapper = null;
        for (StreamOperatorWrapper<?, ?> streamOperatorWrapper2 : list) {
            if (streamOperatorWrapper != null) {
                streamOperatorWrapper.setPrevious(streamOperatorWrapper2);
            }
            streamOperatorWrapper2.setNext(streamOperatorWrapper);
            streamOperatorWrapper = streamOperatorWrapper2;
        }
        return streamOperatorWrapper;
    }

    private <T, P extends StreamOperator<T>> StreamOperatorWrapper<T, P> createOperatorWrapper(P p, StreamTask<?, ?> streamTask, StreamConfig streamConfig, Optional<ProcessingTimeService> optional, boolean z) {
        return new StreamOperatorWrapper<>(p, optional, streamTask.getMailboxExecutorFactory().createExecutor(streamConfig.getChainIndex()), z);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendAcknowledgeCheckpointEvent(long j) {
        if (this.operatorEventDispatcher == null) {
            return;
        }
        this.operatorEventDispatcher.getRegisteredOperators().forEach(operatorID -> {
            this.operatorEventDispatcher.getOperatorEventGateway(operatorID).sendEventToCoordinator(new AcknowledgeCheckpointEvent(j));
        });
    }
}
