package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventDispatcher;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain.class */
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer, BoundedMultiInput {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
    private final RecordWriterOutput<?>[] streamOutputs;
    private final WatermarkGaugeExposingOutput<StreamRecord<OUT>> mainOperatorOutput;

    @Nullable
    private final StreamOperatorWrapper<OUT, OP> mainOperatorWrapper;

    @Nullable
    private final StreamOperatorWrapper<?, ?> firstOperatorWrapper;

    @Nullable
    private final StreamOperatorWrapper<?, ?> tailOperatorWrapper;
    private final Map<StreamConfig.SourceInputConfig, ChainedSource> chainedSources;
    private final int numOperators;
    private final OperatorEventDispatcherImpl operatorEventDispatcher;
    private boolean ignoreEndOfInput;
    private StreamStatus streamStatus;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/OperatorChain$ChainedSource.class */
    public static class ChainedSource {
        private final WatermarkGaugeExposingOutput<StreamRecord<?>> chainedSourceOutput;
        private final StreamTaskSourceInput<?> sourceTaskInput;

        public ChainedSource(WatermarkGaugeExposingOutput<StreamRecord<?>> watermarkGaugeExposingOutput, StreamTaskSourceInput<?> streamTaskSourceInput) {
            this.chainedSourceOutput = watermarkGaugeExposingOutput;
            this.sourceTaskInput = streamTaskSourceInput;
        }

        public Output<StreamRecord<?>> getSourceOutput() {
            return this.chainedSourceOutput;
        }

        public StreamTaskSourceInput<?> getSourceTaskInput() {
            return this.sourceTaskInput;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public OperatorChain(StreamTask<OUT, OP> streamTask, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate) {
        this.streamStatus = StreamStatus.ACTIVE;
        this.operatorEventDispatcher = new OperatorEventDispatcherImpl(streamTask.getEnvironment().getUserCodeClassLoader().asClassLoader(), streamTask.getEnvironment().getOperatorCoordinatorEventGateway());
        ClassLoader userCodeClassLoader = streamTask.getUserCodeClassLoader();
        StreamConfig configuration = streamTask.getConfiguration();
        StreamOperatorFactory streamOperatorFactory = configuration.getStreamOperatorFactory(userCodeClassLoader);
        Map<Integer, StreamConfig> transitiveChainedTaskConfigsWithSelf = configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassLoader);
        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassLoader);
        Map<StreamEdge, RecordWriterOutput<?>> hashMap = new HashMap<>(outEdgesInOrder.size());
        this.streamOutputs = new RecordWriterOutput[outEdgesInOrder.size()];
        boolean z = false;
        try {
            createChainOutputs(outEdgesInOrder, recordWriterDelegate, transitiveChainedTaskConfigsWithSelf, streamTask, hashMap);
            List<StreamOperatorWrapper<?, ?>> arrayList = new ArrayList<>(transitiveChainedTaskConfigsWithSelf.size());
            this.mainOperatorOutput = createOutputCollector(streamTask, configuration, transitiveChainedTaskConfigsWithSelf, userCodeClassLoader, hashMap, arrayList, streamTask.getMailboxExecutorFactory());
            if (streamOperatorFactory != null) {
                Tuple2 createOperator = StreamOperatorFactoryUtil.createOperator(streamOperatorFactory, streamTask, configuration, this.mainOperatorOutput, this.operatorEventDispatcher);
                StreamOperator streamOperator = (StreamOperator) createOperator.f0;
                streamOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, (String) this.mainOperatorOutput.getWatermarkGauge());
                this.mainOperatorWrapper = createOperatorWrapper(streamOperator, streamTask, configuration, (Optional) createOperator.f1, true);
                arrayList.add(this.mainOperatorWrapper);
                this.tailOperatorWrapper = arrayList.get(0);
            } else {
                Preconditions.checkState(arrayList.size() == 0);
                this.mainOperatorWrapper = null;
                this.tailOperatorWrapper = null;
            }
            this.chainedSources = createChainedSources(streamTask, configuration.getInputs(userCodeClassLoader), transitiveChainedTaskConfigsWithSelf, userCodeClassLoader, arrayList);
            this.numOperators = arrayList.size();
            this.firstOperatorWrapper = linkOperatorWrappers(arrayList);
            z = true;
            if (1 == 0) {
                for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
                    if (recordWriterOutput != null) {
                        recordWriterOutput.close();
                    }
                }
            }
        } catch (Throwable th) {
            if (!z) {
                for (RecordWriterOutput<?> recordWriterOutput2 : this.streamOutputs) {
                    if (recordWriterOutput2 != null) {
                        recordWriterOutput2.close();
                    }
                }
            }
            throw th;
        }
    }

    @VisibleForTesting
    OperatorChain(List<StreamOperatorWrapper<?, ?>> list, RecordWriterOutput<?>[] recordWriterOutputArr, WatermarkGaugeExposingOutput<StreamRecord<OUT>> watermarkGaugeExposingOutput, StreamOperatorWrapper<OUT, OP> streamOperatorWrapper) {
        this.streamStatus = StreamStatus.ACTIVE;
        this.streamOutputs = (RecordWriterOutput[]) Preconditions.checkNotNull(recordWriterOutputArr);
        this.mainOperatorOutput = (WatermarkGaugeExposingOutput) Preconditions.checkNotNull(watermarkGaugeExposingOutput);
        this.operatorEventDispatcher = null;
        Preconditions.checkState(list != null && list.size() > 0);
        this.mainOperatorWrapper = (StreamOperatorWrapper) Preconditions.checkNotNull(streamOperatorWrapper);
        this.tailOperatorWrapper = list.get(0);
        this.numOperators = list.size();
        this.chainedSources = Collections.emptyMap();
        this.firstOperatorWrapper = linkOperatorWrappers(list);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void createChainOutputs(List<StreamEdge> list, RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate, Map<Integer, StreamConfig> map, StreamTask<OUT, OP> streamTask, Map<StreamEdge, RecordWriterOutput<?>> map2) {
        for (int i = 0; i < list.size(); i++) {
            StreamEdge streamEdge = list.get(i);
            RecordWriterOutput<OUT> createStreamOutput = createStreamOutput(recordWriterDelegate.getRecordWriter(i), streamEdge, map.get(Integer.valueOf(streamEdge.getSourceId())), streamTask.getEnvironment());
            this.streamOutputs[i] = createStreamOutput;
            map2.put(streamEdge, createStreamOutput);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Map<StreamConfig.SourceInputConfig, ChainedSource> createChainedSources(StreamTask<OUT, OP> streamTask, StreamConfig.InputConfig[] inputConfigArr, Map<Integer, StreamConfig> map, ClassLoader classLoader, List<StreamOperatorWrapper<?, ?>> list) {
        if (Arrays.stream(inputConfigArr).noneMatch(inputConfig -> {
            return inputConfig instanceof StreamConfig.SourceInputConfig;
        })) {
            return Collections.emptyMap();
        }
        Preconditions.checkState(this.mainOperatorWrapper.getStreamOperator() instanceof MultipleInputStreamOperator, "Creating chained input is only supported with MultipleInputStreamOperator and MultipleInputStreamTask");
        HashMap hashMap = new HashMap();
        MultipleInputStreamOperator multipleInputStreamOperator = (MultipleInputStreamOperator) this.mainOperatorWrapper.getStreamOperator();
        List<Input> inputs = multipleInputStreamOperator.getInputs();
        int orElse = Arrays.stream(streamTask.getEnvironment().getAllInputGates()).mapToInt((v0) -> {
            return v0.getInputGateIndex();
        }).max().orElse(-1) + 1;
        for (int i = 0; i < inputConfigArr.length; i++) {
            if (inputConfigArr[i] instanceof StreamConfig.SourceInputConfig) {
                StreamConfig.SourceInputConfig sourceInputConfig = (StreamConfig.SourceInputConfig) inputConfigArr[i];
                StreamConfig streamConfig = map.get(Integer.valueOf(sourceInputConfig.getInputEdge().getSourceId()));
                WatermarkGaugeExposingOutput<StreamRecord<OUT>> createChainedSourceOutput = createChainedSourceOutput(streamTask, inputs.get(i), (OperatorMetricGroup) multipleInputStreamOperator.getMetricGroup(), sourceInputConfig.getInputEdge().getOutputTag());
                int i2 = orElse;
                orElse++;
                hashMap.put(sourceInputConfig, new ChainedSource(createChainedSourceOutput, new StreamTaskSourceInput((SourceOperator) createOperator(streamTask, streamConfig, classLoader, createChainedSourceOutput, list, true), i2, i)));
            }
        }
        return hashMap;
    }

    private WatermarkGaugeExposingOutput<StreamRecord> createChainedSourceOutput(StreamTask<?, OP> streamTask, Input input, OperatorMetricGroup operatorMetricGroup, OutputTag outputTag) {
        if (streamTask.getExecutionConfig().isObjectReuseEnabled()) {
            return new ChainingOutput(input, operatorMetricGroup, this, outputTag, null);
        }
        throw new UnsupportedOperationException("Currently chained sources are supported only with objectReuse enabled");
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider
    public StreamStatus getStreamStatus() {
        return this.streamStatus;
    }

    public OperatorEventDispatcher getOperatorEventDispatcher() {
        return this.operatorEventDispatcher;
    }

    public void dispatchOperatorEvent(OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) throws FlinkException {
        this.operatorEventDispatcher.dispatchEventToHandlers(operatorID, serializedValue);
    }

    @Override // org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer
    public void toggleStreamStatus(StreamStatus streamStatus) {
        if (streamStatus.equals(this.streamStatus)) {
            return;
        }
        this.streamStatus = streamStatus;
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.emitStreamStatus(streamStatus);
        }
    }

    public void broadcastEvent(AbstractEvent abstractEvent) throws IOException {
        broadcastEvent(abstractEvent, false);
    }

    public void broadcastEvent(AbstractEvent abstractEvent, boolean z) throws IOException {
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.broadcastEvent(abstractEvent, z);
        }
    }

    /* JADX WARN: Type inference failed for: r0v11, types: [org.apache.flink.streaming.api.operators.StreamOperator] */
    public void prepareSnapshotPreBarrier(long j) throws Exception {
        for (StreamOperatorWrapper<?, ?> streamOperatorWrapper : getAllOperators()) {
            if (!streamOperatorWrapper.isClosed()) {
                streamOperatorWrapper.getStreamOperator().prepareSnapshotPreBarrier(j);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.BoundedMultiInput
    public void endInput(int i) throws Exception {
        if (this.mainOperatorWrapper == null || this.ignoreEndOfInput) {
            return;
        }
        this.mainOperatorWrapper.endOperatorInput(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Type inference failed for: r0v9, types: [org.apache.flink.streaming.api.operators.StreamOperator] */
    public void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        Iterator<StreamOperatorWrapper<?, ?>> it = getAllOperators(true).iterator();
        while (it.hasNext()) {
            ?? streamOperator = it.next().getStreamOperator();
            streamOperator.initializeState(streamTaskStateInitializer);
            streamOperator.open();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeOperators(StreamTaskActionExecutor streamTaskActionExecutor) throws Exception {
        if (this.firstOperatorWrapper != null) {
            this.firstOperatorWrapper.close(streamTaskActionExecutor, this.ignoreEndOfInput);
        }
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.streamOutputs;
    }

    public Iterable<StreamOperatorWrapper<?, ?>> getAllOperators() {
        return getAllOperators(false);
    }

    public Iterable<StreamOperatorWrapper<?, ?>> getAllOperators(boolean z) {
        return z ? new StreamOperatorWrapper.ReadIterator(this.tailOperatorWrapper, true) : new StreamOperatorWrapper.ReadIterator(this.mainOperatorWrapper, false);
    }

    public int getNumberOfOperators() {
        return this.numOperators;
    }

    public WatermarkGaugeExposingOutput<StreamRecord<OUT>> getMainOperatorOutput() {
        return this.mainOperatorOutput;
    }

    public Output<StreamRecord<?>> getChainedSourceOutput(StreamConfig.SourceInputConfig sourceInputConfig) {
        Preconditions.checkArgument(this.chainedSources.containsKey(sourceInputConfig), "Chained source with sourcedId = [%s] was not found", sourceInputConfig);
        return this.chainedSources.get(sourceInputConfig).getSourceOutput();
    }

    public StreamTaskSourceInput<?> getSourceTaskInput(StreamConfig.SourceInputConfig sourceInputConfig) {
        Preconditions.checkArgument(this.chainedSources.containsKey(sourceInputConfig), "Chained source with sourcedId = [%s] was not found", sourceInputConfig);
        return this.chainedSources.get(sourceInputConfig).getSourceTaskInput();
    }

    public List<StreamTaskSourceInput<?>> getSourceTaskInputs() {
        return (List) this.chainedSources.values().stream().map((v0) -> {
            return v0.getSourceTaskInput();
        }).collect(Collectors.toList());
    }

    public void flushOutputs() throws IOException {
        for (RecordWriterOutput<?> recordWriterOutput : getStreamOutputs()) {
            recordWriterOutput.flush();
        }
    }

    public void releaseOutputs() {
        for (RecordWriterOutput<?> recordWriterOutput : this.streamOutputs) {
            recordWriterOutput.close();
        }
    }

    @Nullable
    public OP getMainOperator() {
        if (this.mainOperatorWrapper == null) {
            return null;
        }
        return this.mainOperatorWrapper.getStreamOperator();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <T> WatermarkGaugeExposingOutput<StreamRecord<T>> createOutputCollector(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Map<Integer, StreamConfig> map, ClassLoader classLoader, Map<StreamEdge, RecordWriterOutput<?>> map2, List<StreamOperatorWrapper<?, ?>> list, MailboxExecutorFactory mailboxExecutorFactory) {
        ArrayList arrayList = new ArrayList(4);
        for (StreamEdge streamEdge : streamConfig.getNonChainedOutputs(classLoader)) {
            arrayList.add(new Tuple2(map2.get(streamEdge), streamEdge));
        }
        for (StreamEdge streamEdge2 : streamConfig.getChainedOutputs(classLoader)) {
            arrayList.add(new Tuple2(createOperatorChain(streamTask, map.get(Integer.valueOf(streamEdge2.getTargetId())), map, classLoader, map2, list, streamEdge2.getOutputTag(), mailboxExecutorFactory), streamEdge2));
        }
        if (arrayList.size() == 1) {
            return (WatermarkGaugeExposingOutput) ((Tuple2) arrayList.get(0)).f0;
        }
        Output[] outputArr = new Output[arrayList.size()];
        for (int i = 0; i < arrayList.size(); i++) {
            outputArr[i] = (Output) ((Tuple2) arrayList.get(i)).f0;
        }
        return streamTask.getExecutionConfig().isObjectReuseEnabled() ? new CopyingBroadcastingOutputCollector(outputArr, this) : new BroadcastingOutputCollector(outputArr, this);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createOperatorChain(StreamTask<OUT, ?> streamTask, StreamConfig streamConfig, Map<Integer, StreamConfig> map, ClassLoader classLoader, Map<StreamEdge, RecordWriterOutput<?>> map2, List<StreamOperatorWrapper<?, ?>> list, OutputTag<IN> outputTag, MailboxExecutorFactory mailboxExecutorFactory) {
        return wrapOperatorIntoOutput((OneInputStreamOperator) createOperator(streamTask, streamConfig, classLoader, createOutputCollector(streamTask, streamConfig, map, classLoader, map2, list, mailboxExecutorFactory), list, false), streamTask, streamConfig, classLoader, outputTag);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <OUT, OP extends StreamOperator<OUT>> OP createOperator(StreamTask<OUT, ?> streamTask, StreamConfig streamConfig, ClassLoader classLoader, WatermarkGaugeExposingOutput<StreamRecord<OUT>> watermarkGaugeExposingOutput, List<StreamOperatorWrapper<?, ?>> list, boolean z) {
        Tuple2 createOperator = StreamOperatorFactoryUtil.createOperator(streamConfig.getStreamOperatorFactory(classLoader), streamTask, streamConfig, watermarkGaugeExposingOutput, this.operatorEventDispatcher);
        OP op = (OP) createOperator.f0;
        list.add(createOperatorWrapper(op, streamTask, streamConfig, (Optional) createOperator.f1, z));
        MetricGroup metricGroup = op.getMetricGroup();
        Gauge<Long> watermarkGauge = watermarkGaugeExposingOutput.getWatermarkGauge();
        watermarkGauge.getClass();
        metricGroup.gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, (String) watermarkGauge::getValue);
        return op;
    }

    private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> wrapOperatorIntoOutput(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, StreamTask<OUT, ?> streamTask, StreamConfig streamConfig, ClassLoader classLoader, OutputTag<IN> outputTag) {
        WatermarkGaugeExposingOutput chainingOutput = streamTask.getExecutionConfig().isObjectReuseEnabled() ? new ChainingOutput(oneInputStreamOperator, this, outputTag) : new CopyingChainingOutput(oneInputStreamOperator, streamConfig.getTypeSerializerIn1(classLoader), outputTag, this);
        MetricGroup metricGroup = oneInputStreamOperator.getMetricGroup();
        Gauge<Long> watermarkGauge = chainingOutput.getWatermarkGauge();
        watermarkGauge.getClass();
        metricGroup.gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, (String) watermarkGauge::getValue);
        return chainingOutput;
    }

    private RecordWriterOutput<OUT> createStreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter, StreamEdge streamEdge, StreamConfig streamConfig, Environment environment) {
        return new RecordWriterOutput<>(recordWriter, streamEdge.getOutputTag() != null ? streamConfig.getTypeSerializerSideOut(streamEdge.getOutputTag(), environment.getUserCodeClassLoader().asClassLoader()) : streamConfig.getTypeSerializerOut(environment.getUserCodeClassLoader().asClassLoader()), streamEdge.getOutputTag(), this);
    }

    private StreamOperatorWrapper<?, ?> linkOperatorWrappers(List<StreamOperatorWrapper<?, ?>> list) {
        StreamOperatorWrapper<?, ?> streamOperatorWrapper = null;
        for (StreamOperatorWrapper<?, ?> streamOperatorWrapper2 : list) {
            if (streamOperatorWrapper != null) {
                streamOperatorWrapper.setPrevious(streamOperatorWrapper2);
            }
            streamOperatorWrapper2.setNext(streamOperatorWrapper);
            streamOperatorWrapper = streamOperatorWrapper2;
        }
        return streamOperatorWrapper;
    }

    private <T, P extends StreamOperator<T>> StreamOperatorWrapper<T, P> createOperatorWrapper(P p, StreamTask<?, ?> streamTask, StreamConfig streamConfig, Optional<ProcessingTimeService> optional, boolean z) {
        return new StreamOperatorWrapper<>(p, optional, streamTask.getMailboxExecutorFactory().createExecutor(streamConfig.getChainIndex()), z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Type inference failed for: r0v4, types: [org.apache.flink.streaming.api.operators.StreamOperator, org.apache.flink.streaming.api.operators.StreamOperator<?>] */
    @Nullable
    public StreamOperator<?> getTailOperator() {
        if (this.tailOperatorWrapper == null) {
            return null;
        }
        return this.tailOperatorWrapper.getStreamOperator();
    }

    public void setIgnoreEndOfInput(boolean z) {
        this.ignoreEndOfInput = z;
    }
}
