package org.apache.flink.streaming.api.graph;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StoppableStreamSource;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/graph/StreamGraph.class */
public class StreamGraph extends StreamingPlan {
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
    private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
    private final StreamExecutionEnvironment environment;
    private final ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointConfig;
    private boolean chaining;
    private Map<Integer, StreamNode> streamNodes;
    private Set<Integer> sources;
    private Set<Integer> sinks;
    private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
    private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtuaPartitionNodes;
    protected Map<Integer, String> vertexIDtoBrokerID;
    protected Map<Integer, Long> vertexIDtoLoopTimeout;
    private AbstractStateBackend stateBackend;
    private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;

    public StreamGraph(StreamExecutionEnvironment streamExecutionEnvironment) {
        this.environment = streamExecutionEnvironment;
        this.executionConfig = streamExecutionEnvironment.getConfig();
        this.checkpointConfig = streamExecutionEnvironment.getCheckpointConfig();
        clear();
    }

    public void clear() {
        this.streamNodes = new HashMap();
        this.virtualSelectNodes = new HashMap();
        this.virtuaPartitionNodes = new HashMap();
        this.vertexIDtoBrokerID = new HashMap();
        this.vertexIDtoLoopTimeout = new HashMap();
        this.iterationSourceSinkPairs = new HashSet();
        this.sources = new HashSet();
        this.sinks = new HashSet();
    }

    public StreamExecutionEnvironment getEnvironment() {
        return this.environment;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public CheckpointConfig getCheckpointConfig() {
        return this.checkpointConfig;
    }

    public String getJobName() {
        return this.jobName;
    }

    public void setJobName(String str) {
        this.jobName = str;
    }

    public void setChaining(boolean z) {
        this.chaining = z;
    }

    public void setStateBackend(AbstractStateBackend abstractStateBackend) {
        this.stateBackend = abstractStateBackend;
    }

    public AbstractStateBackend getStateBackend() {
        return this.stateBackend;
    }

    public boolean isChainingEnabled() {
        return this.chaining;
    }

    public boolean isIterative() {
        return !this.vertexIDtoLoopTimeout.isEmpty();
    }

    public <IN, OUT> void addSource(Integer num, String str, StreamOperator<OUT> streamOperator, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str2) {
        addOperator(num, str, streamOperator, typeInformation, typeInformation2, str2);
        this.sources.add(num);
    }

    public <IN, OUT> void addSink(Integer num, String str, StreamOperator<OUT> streamOperator, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str2) {
        addOperator(num, str, streamOperator, typeInformation, typeInformation2, str2);
        this.sinks.add(num);
    }

    public <IN, OUT> void addOperator(Integer num, String str, StreamOperator<OUT> streamOperator, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, String str2) {
        if (streamOperator instanceof StoppableStreamSource) {
            addNode(num, str, StoppableSourceStreamTask.class, streamOperator, str2);
        } else if (streamOperator instanceof StreamSource) {
            addNode(num, str, SourceStreamTask.class, streamOperator, str2);
        } else {
            addNode(num, str, OneInputStreamTask.class, streamOperator, str2);
        }
        setSerializers(num, (typeInformation == null || (typeInformation instanceof MissingTypeInfo)) ? null : typeInformation.createSerializer(this.executionConfig), null, (typeInformation2 == null || (typeInformation2 instanceof MissingTypeInfo)) ? null : typeInformation2.createSerializer(this.executionConfig));
        if ((streamOperator instanceof OutputTypeConfigurable) && typeInformation2 != null) {
            ((OutputTypeConfigurable) streamOperator).setOutputType(typeInformation2, this.executionConfig);
        }
        if (streamOperator instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) streamOperator).setInputType(typeInformation, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", num);
        }
    }

    public <IN1, IN2, OUT> void addCoOperator(Integer num, String str, TwoInputStreamOperator<IN1, IN2, OUT> twoInputStreamOperator, TypeInformation<IN1> typeInformation, TypeInformation<IN2> typeInformation2, TypeInformation<OUT> typeInformation3, String str2) {
        addNode(num, str, TwoInputStreamTask.class, twoInputStreamOperator, str2);
        setSerializers(num, typeInformation.createSerializer(this.executionConfig), typeInformation2.createSerializer(this.executionConfig), (typeInformation3 == null || (typeInformation3 instanceof MissingTypeInfo)) ? null : typeInformation3.createSerializer(this.executionConfig));
        if (twoInputStreamOperator instanceof OutputTypeConfigurable) {
            ((OutputTypeConfigurable) twoInputStreamOperator).setOutputType(typeInformation3, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CO-TASK: {}", num);
        }
    }

    protected StreamNode addNode(Integer num, String str, Class<? extends AbstractInvokable> cls, StreamOperator<?> streamOperator, String str2) {
        if (this.streamNodes.containsKey(num)) {
            throw new RuntimeException("Duplicate vertexID " + num);
        }
        StreamNode streamNode = new StreamNode(this.environment, num, str, streamOperator, str2, new ArrayList(), cls);
        this.streamNodes.put(num, streamNode);
        return streamNode;
    }

    public void addVirtualSelectNode(Integer num, Integer num2, List<String> list) {
        if (this.virtualSelectNodes.containsKey(num2)) {
            throw new IllegalStateException("Already has virtual select node with id " + num2);
        }
        this.virtualSelectNodes.put(num2, new Tuple2<>(num, list));
    }

    public void addVirtualPartitionNode(Integer num, Integer num2, StreamPartitioner<?> streamPartitioner) {
        if (this.virtuaPartitionNodes.containsKey(num2)) {
            throw new IllegalStateException("Already has virtual partition node with id " + num2);
        }
        this.virtuaPartitionNodes.put(num2, new Tuple2<>(num, streamPartitioner));
    }

    public String getSlotSharingGroup(Integer num) {
        return this.virtualSelectNodes.containsKey(num) ? getSlotSharingGroup(this.virtualSelectNodes.get(num).f0) : this.virtuaPartitionNodes.containsKey(num) ? getSlotSharingGroup(this.virtuaPartitionNodes.get(num).f0) : getStreamNode(num).getSlotSharingGroup();
    }

    public void addEdge(Integer num, Integer num2, int i) {
        addEdgeInternal(num, num2, i, null, new ArrayList());
    }

    private void addEdgeInternal(Integer num, Integer num2, int i, StreamPartitioner<?> streamPartitioner, List<String> list) {
        if (this.virtualSelectNodes.containsKey(num)) {
            int intValue = num.intValue();
            Integer num3 = this.virtualSelectNodes.get(Integer.valueOf(intValue)).f0;
            if (list.isEmpty()) {
                list = this.virtualSelectNodes.get(Integer.valueOf(intValue)).f1;
            }
            addEdgeInternal(num3, num2, i, streamPartitioner, list);
            return;
        }
        if (this.virtuaPartitionNodes.containsKey(num)) {
            int intValue2 = num.intValue();
            Integer num4 = this.virtuaPartitionNodes.get(Integer.valueOf(intValue2)).f0;
            if (streamPartitioner == null) {
                streamPartitioner = this.virtuaPartitionNodes.get(Integer.valueOf(intValue2)).f1;
            }
            addEdgeInternal(num4, num2, i, streamPartitioner, list);
            return;
        }
        StreamNode streamNode = getStreamNode(num);
        StreamNode streamNode2 = getStreamNode(num2);
        if (streamPartitioner == null && streamNode.getParallelism() == streamNode2.getParallelism()) {
            streamPartitioner = new ForwardPartitioner();
        } else if (streamPartitioner == null) {
            streamPartitioner = new RebalancePartitioner();
        }
        if ((streamPartitioner instanceof ForwardPartitioner) && streamNode.getParallelism() != streamNode2.getParallelism()) {
            throw new UnsupportedOperationException("Forward partitioning does not allow change of parallelism. Upstream operation: " + streamNode + " parallelism: " + streamNode.getParallelism() + ", downstream operation: " + streamNode2 + " parallelism: " + streamNode2.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
        }
        StreamEdge streamEdge = new StreamEdge(streamNode, streamNode2, i, list, streamPartitioner);
        getStreamNode(Integer.valueOf(streamEdge.getSourceId())).addOutEdge(streamEdge);
        getStreamNode(Integer.valueOf(streamEdge.getTargetId())).addInEdge(streamEdge);
    }

    public <T> void addOutputSelector(Integer num, OutputSelector<T> outputSelector) {
        if (this.virtuaPartitionNodes.containsKey(num)) {
            addOutputSelector(this.virtuaPartitionNodes.get(num).f0, outputSelector);
            return;
        }
        if (this.virtualSelectNodes.containsKey(num)) {
            addOutputSelector(this.virtualSelectNodes.get(num).f0, outputSelector);
            return;
        }
        getStreamNode(num).addOutputSelector(outputSelector);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Outputselector set for {}", num);
        }
    }

    public void setParallelism(Integer num, int i) {
        if (getStreamNode(num) != null) {
            getStreamNode(num).setParallelism(Integer.valueOf(i));
        }
    }

    public void setMaxParallelism(int i, int i2) {
        if (getStreamNode(Integer.valueOf(i)) != null) {
            getStreamNode(Integer.valueOf(i)).setMaxParallelism(i2);
        }
    }

    public void setOneInputStateKey(Integer num, KeySelector<?, ?> keySelector, TypeSerializer<?> typeSerializer) {
        StreamNode streamNode = getStreamNode(num);
        streamNode.setStatePartitioner1(keySelector);
        streamNode.setStateKeySerializer(typeSerializer);
    }

    public void setTwoInputStateKey(Integer num, KeySelector<?, ?> keySelector, KeySelector<?, ?> keySelector2, TypeSerializer<?> typeSerializer) {
        StreamNode streamNode = getStreamNode(num);
        streamNode.setStatePartitioner1(keySelector);
        streamNode.setStatePartitioner2(keySelector2);
        streamNode.setStateKeySerializer(typeSerializer);
    }

    public void setBufferTimeout(Integer num, long j) {
        if (getStreamNode(num) != null) {
            getStreamNode(num).setBufferTimeout(Long.valueOf(j));
        }
    }

    public void setSerializers(Integer num, TypeSerializer<?> typeSerializer, TypeSerializer<?> typeSerializer2, TypeSerializer<?> typeSerializer3) {
        StreamNode streamNode = getStreamNode(num);
        streamNode.setSerializerIn1(typeSerializer);
        streamNode.setSerializerIn2(typeSerializer2);
        streamNode.setSerializerOut(typeSerializer3);
    }

    public void setSerializersFrom(Integer num, Integer num2) {
        StreamNode streamNode = getStreamNode(num);
        StreamNode streamNode2 = getStreamNode(num2);
        streamNode2.setSerializerIn1(streamNode.getTypeSerializerOut());
        streamNode2.setSerializerOut(streamNode.getTypeSerializerIn1());
    }

    public <OUT> void setOutType(Integer num, TypeInformation<OUT> typeInformation) {
        getStreamNode(num).setSerializerOut(typeInformation.createSerializer(this.executionConfig));
    }

    public <IN, OUT> void setOperator(Integer num, StreamOperator<OUT> streamOperator) {
        getStreamNode(num).setOperator(streamOperator);
    }

    public void setInputFormat(Integer num, InputFormat<?, ?> inputFormat) {
        getStreamNode(num).setInputFormat(inputFormat);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTransformationUID(Integer num, String str) {
        StreamNode streamNode = this.streamNodes.get(num);
        if (streamNode != null) {
            streamNode.setTransformationUID(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTransformationUserHash(Integer num, String str) {
        StreamNode streamNode = this.streamNodes.get(num);
        if (streamNode != null) {
            streamNode.setUserHash(str);
        }
    }

    public StreamNode getStreamNode(Integer num) {
        return this.streamNodes.get(num);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<? extends Integer> getVertexIDs() {
        return this.streamNodes.keySet();
    }

    public List<StreamEdge> getStreamEdges(int i, int i2) {
        ArrayList arrayList = new ArrayList();
        for (StreamEdge streamEdge : getStreamNode(Integer.valueOf(i)).getOutEdges()) {
            if (streamEdge.getTargetId() == i2) {
                arrayList.add(streamEdge);
            }
        }
        if (arrayList.isEmpty()) {
            throw new RuntimeException("No such edge in stream graph: " + i + " -> " + i2);
        }
        return arrayList;
    }

    public Collection<Integer> getSourceIDs() {
        return this.sources;
    }

    public Collection<Integer> getSinkIDs() {
        return this.sinks;
    }

    public Collection<StreamNode> getStreamNodes() {
        return this.streamNodes.values();
    }

    public Set<Tuple2<Integer, StreamOperator<?>>> getOperators() {
        HashSet hashSet = new HashSet();
        for (StreamNode streamNode : this.streamNodes.values()) {
            hashSet.add(new Tuple2(Integer.valueOf(streamNode.getId()), streamNode.getOperator()));
        }
        return hashSet;
    }

    public String getBrokerID(Integer num) {
        return this.vertexIDtoBrokerID.get(num);
    }

    public long getLoopTimeout(Integer num) {
        return this.vertexIDtoLoopTimeout.get(num).longValue();
    }

    public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int i, int i2, int i3, long j, int i4, int i5) {
        StreamNode addNode = addNode(Integer.valueOf(i2), null, StreamIterationHead.class, null, "IterationSource-" + i);
        this.sources.add(Integer.valueOf(addNode.getId()));
        setParallelism(Integer.valueOf(addNode.getId()), i4);
        setMaxParallelism(addNode.getId(), i5);
        StreamNode addNode2 = addNode(Integer.valueOf(i3), null, StreamIterationTail.class, null, "IterationSink-" + i);
        this.sinks.add(Integer.valueOf(addNode2.getId()));
        setParallelism(Integer.valueOf(addNode2.getId()), i4);
        setMaxParallelism(addNode2.getId(), i4);
        this.iterationSourceSinkPairs.add(new Tuple2<>(addNode, addNode2));
        this.vertexIDtoBrokerID.put(Integer.valueOf(addNode.getId()), "broker-" + i);
        this.vertexIDtoBrokerID.put(Integer.valueOf(addNode2.getId()), "broker-" + i);
        this.vertexIDtoLoopTimeout.put(Integer.valueOf(addNode.getId()), Long.valueOf(j));
        this.vertexIDtoLoopTimeout.put(Integer.valueOf(addNode2.getId()), Long.valueOf(j));
        return new Tuple2<>(addNode, addNode2);
    }

    public Set<Tuple2<StreamNode, StreamNode>> getIterationSourceSinkPairs() {
        return this.iterationSourceSinkPairs;
    }

    private void removeEdge(StreamEdge streamEdge) {
        streamEdge.getSourceVertex().getOutEdges().remove(streamEdge);
        streamEdge.getTargetVertex().getInEdges().remove(streamEdge);
    }

    private void removeVertex(StreamNode streamNode) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(streamNode.getInEdges());
        hashSet.addAll(streamNode.getOutEdges());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            removeEdge((StreamEdge) it.next());
        }
        this.streamNodes.remove(Integer.valueOf(streamNode.getId()));
    }

    @Override // org.apache.flink.optimizer.plan.StreamingPlan
    public JobGraph getJobGraph() {
        if (isIterative() && this.checkpointConfig.isCheckpointingEnabled() && !this.checkpointConfig.isForceCheckpointing()) {
            throw new UnsupportedOperationException("Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. \nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
        }
        return new StreamingJobGraphGenerator(this).createJobGraph();
    }

    @Override // org.apache.flink.optimizer.plan.StreamingPlan
    public String getStreamingPlanAsJSON() {
        try {
            return new JSONGenerator(this).getJSON();
        } catch (Exception e) {
            throw new RuntimeException("JSON plan creation failed", e);
        }
    }

    @Override // org.apache.flink.optimizer.plan.StreamingPlan
    public void dumpStreamingPlanAsJSON(File file) throws IOException {
        PrintWriter printWriter = null;
        try {
            printWriter = new PrintWriter((OutputStream) new FileOutputStream(file), false);
            printWriter.write(getStreamingPlanAsJSON());
            printWriter.flush();
            if (printWriter != null) {
                printWriter.close();
            }
        } catch (Throwable th) {
            if (printWriter != null) {
                printWriter.close();
            }
            throw th;
        }
    }
}
