package org.apache.flink.streaming.api.environment;

import com.esotericsoftware.kryo.Serializer;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.operators.util.SlotSharingGroupUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorFactory;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.functions.source.FileReadFunction;
import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SplittableIterator;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.WrappingRuntimeException;

@Public
/* loaded from: input_file:org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.class */
public class StreamExecutionEnvironment {

    @Deprecated
    public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
    private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.EventTime;
    private static StreamExecutionEnvironmentFactory contextEnvironmentFactory = null;
    private static final ThreadLocal<StreamExecutionEnvironmentFactory> threadLocalContextEnvironmentFactory = new ThreadLocal<>();
    private static int defaultLocalParallelism = Runtime.getRuntime().availableProcessors();
    private final ExecutionConfig config;
    private final CheckpointConfig checkpointCfg;
    protected final List<Transformation<?>> transformations;
    private long bufferTimeout;
    protected boolean isChainingEnabled;
    private StateBackend defaultStateBackend;
    private TernaryBoolean changelogStateBackendEnabled;
    private Path defaultSavepointDirectory;
    private TimeCharacteristic timeCharacteristic;
    protected final List<Tuple2<String, DistributedCache.DistributedCacheEntry>> cacheFile;
    private final PipelineExecutorServiceLoader executorServiceLoader;
    protected final Configuration configuration;
    private final ClassLoader userClassloader;
    private final List<JobListener> jobListeners;
    private final Map<String, ResourceProfile> slotSharingGroupResources;

    public StreamExecutionEnvironment() {
        this(new Configuration());
    }

    @PublicEvolving
    public StreamExecutionEnvironment(Configuration configuration) {
        this(configuration, null);
    }

    @PublicEvolving
    public StreamExecutionEnvironment(Configuration configuration, ClassLoader classLoader) {
        this(new DefaultExecutorServiceLoader(), configuration, classLoader);
    }

    @PublicEvolving
    public StreamExecutionEnvironment(PipelineExecutorServiceLoader pipelineExecutorServiceLoader, Configuration configuration, ClassLoader classLoader) {
        this.config = new ExecutionConfig();
        this.checkpointCfg = new CheckpointConfig();
        this.transformations = new ArrayList();
        this.bufferTimeout = -1L;
        this.isChainingEnabled = true;
        this.changelogStateBackendEnabled = TernaryBoolean.UNDEFINED;
        this.timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
        this.cacheFile = new ArrayList();
        this.jobListeners = new ArrayList();
        this.slotSharingGroupResources = new HashMap();
        this.executorServiceLoader = (PipelineExecutorServiceLoader) Preconditions.checkNotNull(pipelineExecutorServiceLoader);
        this.configuration = new Configuration((Configuration) Preconditions.checkNotNull(configuration));
        this.userClassloader = classLoader == null ? getClass().getClassLoader() : classLoader;
        configure(this.configuration, this.userClassloader);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClassLoader getUserClassloader() {
        return this.userClassloader;
    }

    public ExecutionConfig getConfig() {
        return this.config;
    }

    public List<Tuple2<String, DistributedCache.DistributedCacheEntry>> getCachedFiles() {
        return this.cacheFile;
    }

    @PublicEvolving
    public List<JobListener> getJobListeners() {
        return this.jobListeners;
    }

    public StreamExecutionEnvironment setParallelism(int i) {
        this.config.setParallelism(i);
        return this;
    }

    @PublicEvolving
    public StreamExecutionEnvironment setRuntimeMode(RuntimeExecutionMode runtimeExecutionMode) {
        Preconditions.checkNotNull(runtimeExecutionMode);
        this.configuration.set((ConfigOption<ConfigOption<RuntimeExecutionMode>>) ExecutionOptions.RUNTIME_MODE, (ConfigOption<RuntimeExecutionMode>) runtimeExecutionMode);
        return this;
    }

    public StreamExecutionEnvironment setMaxParallelism(int i) {
        Preconditions.checkArgument(i > 0 && i <= 32768, "maxParallelism is out of bounds 0 < maxParallelism <= 32768. Found: " + i);
        this.config.setMaxParallelism(i);
        return this;
    }

    @PublicEvolving
    public StreamExecutionEnvironment registerSlotSharingGroup(SlotSharingGroup slotSharingGroup) {
        if (!SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup).equals(ResourceSpec.UNKNOWN)) {
            this.slotSharingGroupResources.put(slotSharingGroup.getName(), ResourceProfile.fromResourceSpec(SlotSharingGroupUtils.extractResourceSpec(slotSharingGroup), MemorySize.ZERO));
        }
        return this;
    }

    public int getParallelism() {
        return this.config.getParallelism();
    }

    public int getMaxParallelism() {
        return this.config.getMaxParallelism();
    }

    public StreamExecutionEnvironment setBufferTimeout(long j) {
        if (j < -1) {
            throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
        }
        this.bufferTimeout = j;
        return this;
    }

    public long getBufferTimeout() {
        return this.bufferTimeout;
    }

    @PublicEvolving
    public StreamExecutionEnvironment disableOperatorChaining() {
        this.isChainingEnabled = false;
        return this;
    }

    @PublicEvolving
    public boolean isChainingEnabled() {
        return this.isChainingEnabled;
    }

    public CheckpointConfig getCheckpointConfig() {
        return this.checkpointCfg;
    }

    public StreamExecutionEnvironment enableCheckpointing(long j) {
        this.checkpointCfg.setCheckpointInterval(j);
        return this;
    }

    public StreamExecutionEnvironment enableCheckpointing(long j, CheckpointingMode checkpointingMode) {
        this.checkpointCfg.setCheckpointingMode(checkpointingMode);
        this.checkpointCfg.setCheckpointInterval(j);
        return this;
    }

    @PublicEvolving
    @Deprecated
    public StreamExecutionEnvironment enableCheckpointing(long j, CheckpointingMode checkpointingMode, boolean z) {
        this.checkpointCfg.setCheckpointingMode(checkpointingMode);
        this.checkpointCfg.setCheckpointInterval(j);
        this.checkpointCfg.setForceCheckpointing(z);
        return this;
    }

    @PublicEvolving
    @Deprecated
    public StreamExecutionEnvironment enableCheckpointing() {
        this.checkpointCfg.setCheckpointInterval(500L);
        return this;
    }

    public long getCheckpointInterval() {
        return this.checkpointCfg.getCheckpointInterval();
    }

    @PublicEvolving
    @Deprecated
    public boolean isForceCheckpointing() {
        return this.checkpointCfg.isForceCheckpointing();
    }

    @PublicEvolving
    public boolean isUnalignedCheckpointsEnabled() {
        return this.checkpointCfg.isUnalignedCheckpointsEnabled();
    }

    @PublicEvolving
    public boolean isForceUnalignedCheckpoints() {
        return this.checkpointCfg.isForceUnalignedCheckpoints();
    }

    public CheckpointingMode getCheckpointingMode() {
        return this.checkpointCfg.getCheckpointingMode();
    }

    @PublicEvolving
    public StreamExecutionEnvironment setStateBackend(StateBackend stateBackend) {
        this.defaultStateBackend = (StateBackend) Preconditions.checkNotNull(stateBackend);
        return this;
    }

    @PublicEvolving
    public StateBackend getStateBackend() {
        return this.defaultStateBackend;
    }

    @PublicEvolving
    public StreamExecutionEnvironment setDefaultSavepointDirectory(String str) {
        Preconditions.checkNotNull(str);
        return setDefaultSavepointDirectory(new Path(str));
    }

    @PublicEvolving
    public StreamExecutionEnvironment setDefaultSavepointDirectory(URI uri) {
        Preconditions.checkNotNull(uri);
        return setDefaultSavepointDirectory(new Path(uri));
    }

    @PublicEvolving
    public StreamExecutionEnvironment setDefaultSavepointDirectory(Path path) {
        this.defaultSavepointDirectory = (Path) Preconditions.checkNotNull(path);
        return this;
    }

    @PublicEvolving
    @Nullable
    public Path getDefaultSavepointDirectory() {
        return this.defaultSavepointDirectory;
    }

    @PublicEvolving
    public void setRestartStrategy(RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) {
        this.config.setRestartStrategy(restartStrategyConfiguration);
    }

    @PublicEvolving
    public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
        return this.config.getRestartStrategy();
    }

    @PublicEvolving
    @Deprecated
    public void setNumberOfExecutionRetries(int i) {
        this.config.setNumberOfExecutionRetries(i);
    }

    @PublicEvolving
    @Deprecated
    public int getNumberOfExecutionRetries() {
        return this.config.getNumberOfExecutionRetries();
    }

    /* JADX WARN: Incorrect types in method signature: <T:Lcom/esotericsoftware/kryo/Serializer<*>;:Ljava/io/Serializable;>(Ljava/lang/Class<*>;TT;)V */
    public void addDefaultKryoSerializer(Class cls, Serializer serializer) {
        this.config.addDefaultKryoSerializer(cls, serializer);
    }

    public void addDefaultKryoSerializer(Class<?> cls, Class<? extends Serializer<?>> cls2) {
        this.config.addDefaultKryoSerializer(cls, cls2);
    }

    /* JADX WARN: Incorrect types in method signature: <T:Lcom/esotericsoftware/kryo/Serializer<*>;:Ljava/io/Serializable;>(Ljava/lang/Class<*>;TT;)V */
    public void registerTypeWithKryoSerializer(Class cls, Serializer serializer) {
        this.config.registerTypeWithKryoSerializer(cls, serializer);
    }

    public void registerTypeWithKryoSerializer(Class<?> cls, Class<? extends Serializer> cls2) {
        this.config.registerTypeWithKryoSerializer(cls, cls2);
    }

    public void registerType(Class<?> cls) {
        if (cls == null) {
            throw new NullPointerException("Cannot register null type class.");
        }
        if (TypeExtractor.createTypeInfo((Class) cls) instanceof PojoTypeInfo) {
            this.config.registerPojoType(cls);
        } else {
            this.config.registerKryoType(cls);
        }
    }

    @PublicEvolving
    @Deprecated
    public void setStreamTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.timeCharacteristic = (TimeCharacteristic) Preconditions.checkNotNull(timeCharacteristic);
        if (timeCharacteristic == TimeCharacteristic.ProcessingTime) {
            getConfig().setAutoWatermarkInterval(0L);
        } else {
            getConfig().setAutoWatermarkInterval(200L);
        }
    }

    @PublicEvolving
    @Deprecated
    public TimeCharacteristic getStreamTimeCharacteristic() {
        return this.timeCharacteristic;
    }

    @PublicEvolving
    public void configure(ReadableConfig readableConfig) {
        configure(readableConfig, this.userClassloader);
    }

    @PublicEvolving
    public void configure(ReadableConfig readableConfig, ClassLoader classLoader) {
        readableConfig.getOptional(StreamPipelineOptions.TIME_CHARACTERISTIC).ifPresent(this::setStreamTimeCharacteristic);
        Optional.ofNullable(loadStateBackend(readableConfig, classLoader)).ifPresent(this::setStateBackend);
        readableConfig.getOptional(PipelineOptions.OPERATOR_CHAINING).ifPresent(bool -> {
            this.isChainingEnabled = bool.booleanValue();
        });
        readableConfig.getOptional(ExecutionOptions.BUFFER_TIMEOUT).ifPresent(duration -> {
            setBufferTimeout(duration.toMillis());
        });
        readableConfig.getOptional(DeploymentOptions.JOB_LISTENERS).ifPresent(list -> {
            registerCustomListeners(classLoader, list);
        });
        readableConfig.getOptional(PipelineOptions.CACHED_FILES).ifPresent(list2 -> {
            this.cacheFile.clear();
            this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(list2));
        });
        readableConfig.getOptional(ExecutionOptions.RUNTIME_MODE).ifPresent(runtimeExecutionMode -> {
            this.configuration.set((ConfigOption<ConfigOption<RuntimeExecutionMode>>) ExecutionOptions.RUNTIME_MODE, (ConfigOption<RuntimeExecutionMode>) runtimeExecutionMode);
        });
        readableConfig.getOptional(ExecutionOptions.BATCH_SHUFFLE_MODE).ifPresent(batchShuffleMode -> {
            this.configuration.set((ConfigOption<ConfigOption<BatchShuffleMode>>) ExecutionOptions.BATCH_SHUFFLE_MODE, (ConfigOption<BatchShuffleMode>) batchShuffleMode);
        });
        readableConfig.getOptional(ExecutionOptions.SORT_INPUTS).ifPresent(bool2 -> {
            this.configuration.set((ConfigOption<ConfigOption<Boolean>>) ExecutionOptions.SORT_INPUTS, (ConfigOption<Boolean>) bool2);
        });
        readableConfig.getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND).ifPresent(bool3 -> {
            this.configuration.set((ConfigOption<ConfigOption<Boolean>>) ExecutionOptions.USE_BATCH_STATE_BACKEND, (ConfigOption<Boolean>) bool3);
        });
        readableConfig.getOptional(PipelineOptions.NAME).ifPresent(str -> {
            this.configuration.set((ConfigOption<ConfigOption<String>>) PipelineOptions.NAME, (ConfigOption<String>) str);
        });
        readableConfig.getOptional(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH).ifPresent(bool4 -> {
            this.configuration.set((ConfigOption<ConfigOption<Boolean>>) ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, (ConfigOption<Boolean>) bool4);
        });
        this.config.configure(readableConfig, classLoader);
        this.checkpointCfg.configure(readableConfig);
    }

    private void registerCustomListeners(ClassLoader classLoader, List<String> list) {
        for (String str : list) {
            try {
                this.jobListeners.add((JobListener) InstantiationUtil.instantiate(str, JobListener.class, classLoader));
            } catch (FlinkException e) {
                throw new WrappingRuntimeException("Could not load JobListener : " + str, e);
            }
        }
    }

    private StateBackend loadStateBackend(ReadableConfig readableConfig, ClassLoader classLoader) {
        try {
            return StateBackendLoader.loadStateBackendFromConfig(readableConfig, classLoader, null);
        } catch (IOException | DynamicCodeLoadingException e) {
            throw new WrappingRuntimeException(e);
        }
    }

    @Deprecated
    public DataStreamSource<Long> generateSequence(long j, long j2) {
        if (j > j2) {
            throw new IllegalArgumentException("Start of sequence must not be greater than the end");
        }
        return addSource(new StatefulSequenceSource(j, j2), "Sequence Source (Deprecated)");
    }

    public DataStreamSource<Long> fromSequence(long j, long j2) {
        if (j > j2) {
            throw new IllegalArgumentException("Start of sequence must not be greater than the end");
        }
        return fromSource(new NumberSequenceSource(j, j2), WatermarkStrategy.noWatermarks(), "Sequence Source");
    }

    @SafeVarargs
    public final <OUT> DataStreamSource<OUT> fromElements(OUT... outArr) {
        if (outArr.length == 0) {
            throw new IllegalArgumentException("fromElements needs at least one element as argument");
        }
        try {
            return fromCollection(Arrays.asList(outArr), TypeExtractor.getForObject(outArr[0]));
        } catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + outArr[0].getClass().getName() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
    }

    @SafeVarargs
    public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> cls, OUT... outArr) {
        if (outArr.length == 0) {
            throw new IllegalArgumentException("fromElements needs at least one element as argument");
        }
        try {
            return fromCollection(Arrays.asList(outArr), TypeExtractor.getForClass(cls));
        } catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + cls.getName() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> collection) {
        Preconditions.checkNotNull(collection, "Collection must not be null");
        if (collection.isEmpty()) {
            throw new IllegalArgumentException("Collection must not be empty");
        }
        OUT next = collection.iterator().next();
        if (next == null) {
            throw new IllegalArgumentException("Collection must not contain null elements");
        }
        try {
            return fromCollection(collection, TypeExtractor.getForObject(next));
        } catch (Exception e) {
            throw new RuntimeException("Could not create TypeInformation for type " + next.getClass() + "; please specify the TypeInformation manually via StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
        }
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> collection, TypeInformation<OUT> typeInformation) {
        Preconditions.checkNotNull(collection, "Collection must not be null");
        FromElementsFunction.checkCollection(collection, typeInformation.getTypeClass());
        return addSource(new FromElementsFunction(collection), "Collection Source", typeInformation, Boundedness.BOUNDED).setParallelism(1);
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> it, Class<OUT> cls) {
        return fromCollection(it, TypeExtractor.getForClass(cls));
    }

    public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> it, TypeInformation<OUT> typeInformation) {
        Preconditions.checkNotNull(it, "The iterator must not be null");
        return addSource(new FromIteratorFunction(it), "Collection Source", typeInformation, Boundedness.BOUNDED);
    }

    public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> splittableIterator, Class<OUT> cls) {
        return fromParallelCollection(splittableIterator, TypeExtractor.getForClass(cls));
    }

    public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> splittableIterator, TypeInformation<OUT> typeInformation) {
        return fromParallelCollection(splittableIterator, typeInformation, "Parallel Collection Source");
    }

    private <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> splittableIterator, TypeInformation<OUT> typeInformation, String str) {
        return addSource(new FromSplittableIteratorFunction(splittableIterator), str, typeInformation, Boundedness.BOUNDED);
    }

    public DataStreamSource<String> readTextFile(String str) {
        return readTextFile(str, "UTF-8");
    }

    public DataStreamSource<String> readTextFile(String str, String str2) {
        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(str), "The file path must not be null or blank.");
        TextInputFormat textInputFormat = new TextInputFormat(new Path(str));
        textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter());
        BasicTypeInfo<String> basicTypeInfo = BasicTypeInfo.STRING_TYPE_INFO;
        textInputFormat.setCharsetName(str2);
        return readFile(textInputFormat, str, FileProcessingMode.PROCESS_ONCE, -1L, basicTypeInfo);
    }

    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> fileInputFormat, String str) {
        return readFile(fileInputFormat, str, FileProcessingMode.PROCESS_ONCE, -1L);
    }

    @PublicEvolving
    @Deprecated
    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> fileInputFormat, String str, FileProcessingMode fileProcessingMode, long j, FilePathFilter filePathFilter) {
        fileInputFormat.setFilesFilter(filePathFilter);
        try {
            return readFile(fileInputFormat, str, fileProcessingMode, j, TypeExtractor.getInputFormatTypes(fileInputFormat));
        } catch (Exception e) {
            throw new InvalidProgramException("The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
        }
    }

    @PublicEvolving
    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> fileInputFormat, String str, FileProcessingMode fileProcessingMode, long j) {
        try {
            return readFile(fileInputFormat, str, fileProcessingMode, j, TypeExtractor.getInputFormatTypes(fileInputFormat));
        } catch (Exception e) {
            throw new InvalidProgramException("The type returned by the input format could not be automatically determined. Please specify the TypeInformation of the produced type explicitly by using the 'createInput(InputFormat, TypeInformation)' method instead.");
        }
    }

    @Deprecated
    public DataStream<String> readFileStream(String str, long j, FileMonitoringFunction.WatchType watchType) {
        return addSource(new FileMonitoringFunction(str, j, watchType), "Read File Stream source").flatMap(new FileReadFunction());
    }

    @PublicEvolving
    public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> fileInputFormat, String str, FileProcessingMode fileProcessingMode, long j, TypeInformation<OUT> typeInformation) {
        Preconditions.checkNotNull(fileInputFormat, "InputFormat must not be null.");
        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(str), "The file path must not be null or blank.");
        fileInputFormat.setFilePath(str);
        return createFileInput(fileInputFormat, typeInformation, "Custom File Source", fileProcessingMode, j);
    }

    @Deprecated
    public DataStreamSource<String> socketTextStream(String str, int i, char c, long j) {
        return socketTextStream(str, i, String.valueOf(c), j);
    }

    @PublicEvolving
    public DataStreamSource<String> socketTextStream(String str, int i, String str2, long j) {
        return addSource(new SocketTextStreamFunction(str, i, str2, j), "Socket Stream");
    }

    @Deprecated
    public DataStreamSource<String> socketTextStream(String str, int i, char c) {
        return socketTextStream(str, i, c, 0L);
    }

    @PublicEvolving
    public DataStreamSource<String> socketTextStream(String str, int i, String str2) {
        return socketTextStream(str, i, str2, 0L);
    }

    @PublicEvolving
    public DataStreamSource<String> socketTextStream(String str, int i) {
        return socketTextStream(str, i, "\n");
    }

    @PublicEvolving
    public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat) {
        return createInput(inputFormat, TypeExtractor.getInputFormatTypes(inputFormat));
    }

    @PublicEvolving
    public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInformation) {
        return inputFormat instanceof FileInputFormat ? createFileInput((FileInputFormat) inputFormat, typeInformation, "Custom File source", FileProcessingMode.PROCESS_ONCE, -1L) : createInput(inputFormat, typeInformation, "Custom Source");
    }

    private <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInformation, String str) {
        return addSource(new InputFormatSourceFunction(inputFormat, typeInformation), str, typeInformation);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <OUT> DataStreamSource<OUT> createFileInput(FileInputFormat<OUT> fileInputFormat, TypeInformation<OUT> typeInformation, String str, FileProcessingMode fileProcessingMode, long j) {
        Preconditions.checkNotNull(fileInputFormat, "Unspecified file input format.");
        Preconditions.checkNotNull(typeInformation, "Unspecified output type information.");
        Preconditions.checkNotNull(str, "Unspecified name for the source.");
        Preconditions.checkNotNull(fileProcessingMode, "Unspecified monitoring mode.");
        Preconditions.checkArgument(fileProcessingMode.equals(FileProcessingMode.PROCESS_ONCE) || j >= 1, "The path monitoring interval cannot be less than 1 ms.");
        return new DataStreamSource<>(addSource(new ContinuousFileMonitoringFunction(fileInputFormat, fileProcessingMode, getParallelism(), j), str, null, fileProcessingMode == FileProcessingMode.PROCESS_ONCE ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED).transform("Split Reader: " + str, typeInformation, new ContinuousFileReaderOperatorFactory(fileInputFormat)));
    }

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction) {
        return addSource(sourceFunction, "Custom Source");
    }

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction, String str) {
        return addSource(sourceFunction, str, null);
    }

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction, TypeInformation<OUT> typeInformation) {
        return addSource(sourceFunction, "Custom Source", typeInformation);
    }

    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction, String str, TypeInformation<OUT> typeInformation) {
        return addSource(sourceFunction, str, typeInformation, Boundedness.CONTINUOUS_UNBOUNDED);
    }

    private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction, String str, @Nullable TypeInformation<OUT> typeInformation, Boundedness boundedness) {
        Preconditions.checkNotNull(sourceFunction);
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(boundedness);
        clean(sourceFunction);
        return new DataStreamSource<>(this, getTypeInfo(sourceFunction, str, SourceFunction.class, typeInformation), new StreamSource(sourceFunction), sourceFunction instanceof ParallelSourceFunction, str, boundedness);
    }

    @Experimental
    public <OUT> DataStreamSource<OUT> fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy, String str) {
        return fromSource(source, watermarkStrategy, str, null);
    }

    @Experimental
    public <OUT> DataStreamSource<OUT> fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> watermarkStrategy, String str, TypeInformation<OUT> typeInformation) {
        return new DataStreamSource<>(this, (Source) Preconditions.checkNotNull(source, "source"), (WatermarkStrategy) Preconditions.checkNotNull(watermarkStrategy, "timestampsAndWatermarks"), (TypeInformation) Preconditions.checkNotNull(getTypeInfo(source, str, Source.class, typeInformation)), (String) Preconditions.checkNotNull(str));
    }

    public JobExecutionResult execute() throws Exception {
        return execute(getStreamGraph());
    }

    public JobExecutionResult execute(String str) throws Exception {
        Preconditions.checkNotNull(str, "Streaming Job name should not be null.");
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(str);
        return execute(streamGraph);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v18, types: [org.apache.flink.api.common.JobExecutionResult] */
    @Internal
    public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
        JobClient executeAsync = executeAsync(streamGraph);
        try {
            DetachedJobExecutionResult detachedJobExecutionResult = this.configuration.getBoolean(DeploymentOptions.ATTACHED) ? executeAsync.getJobExecutionResult().get() : new DetachedJobExecutionResult(executeAsync.getJobID());
            DetachedJobExecutionResult detachedJobExecutionResult2 = detachedJobExecutionResult;
            this.jobListeners.forEach(jobListener -> {
                jobListener.onJobExecuted(detachedJobExecutionResult2, null);
            });
            return detachedJobExecutionResult;
        } catch (Throwable th) {
            Throwable stripExecutionException = ExceptionUtils.stripExecutionException(th);
            this.jobListeners.forEach(jobListener2 -> {
                jobListener2.onJobExecuted(null, stripExecutionException);
            });
            ExceptionUtils.rethrowException(stripExecutionException);
            return null;
        }
    }

    @PublicEvolving
    public void registerJobListener(JobListener jobListener) {
        Preconditions.checkNotNull(jobListener, "JobListener cannot be null");
        this.jobListeners.add(jobListener);
    }

    @PublicEvolving
    public void clearJobListeners() {
        this.jobListeners.clear();
    }

    @PublicEvolving
    public final JobClient executeAsync() throws Exception {
        return executeAsync(getStreamGraph());
    }

    @PublicEvolving
    public JobClient executeAsync(String str) throws Exception {
        Preconditions.checkNotNull(str, "Streaming Job name should not be null.");
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(str);
        return executeAsync(streamGraph);
    }

    @Internal
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        Preconditions.checkNotNull(streamGraph, "StreamGraph cannot be null.");
        Preconditions.checkNotNull(this.configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
        PipelineExecutorFactory executorFactory = this.executorServiceLoader.getExecutorFactory(this.configuration);
        Preconditions.checkNotNull(executorFactory, "Cannot find compatible factory for specified execution.target (=%s)", this.configuration.get(DeploymentOptions.TARGET));
        try {
            JobClient jobClient = executorFactory.getExecutor(this.configuration).execute(streamGraph, this.configuration, this.userClassloader).get();
            this.jobListeners.forEach(jobListener -> {
                jobListener.onJobSubmitted(jobClient, null);
            });
            return jobClient;
        } catch (ExecutionException e) {
            Throwable stripExecutionException = ExceptionUtils.stripExecutionException(e);
            this.jobListeners.forEach(jobListener2 -> {
                jobListener2.onJobSubmitted(null, stripExecutionException);
            });
            throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()), stripExecutionException);
        }
    }

    @Internal
    public StreamGraph getStreamGraph() {
        return getStreamGraph(true);
    }

    @Internal
    public StreamGraph getStreamGraph(boolean z) {
        StreamGraph generate = getStreamGraphGenerator(this.transformations).generate();
        if (z) {
            this.transformations.clear();
        }
        return generate;
    }

    @Internal
    public StreamGraph generateStreamGraph(List<Transformation<?>> list) {
        return getStreamGraphGenerator(list).generate();
    }

    private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> list) {
        if (list.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        }
        return new StreamGraphGenerator(list, this.config, this.checkpointCfg, this.configuration).setStateBackend(this.defaultStateBackend).setChangelogStateBackendEnabled(this.changelogStateBackendEnabled).setSavepointDir(this.defaultSavepointDirectory).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout).setSlotSharingGroupResource(this.slotSharingGroupResources);
    }

    public String getExecutionPlan() {
        return getStreamGraph(false).getStreamingPlanAsJSON();
    }

    @Internal
    public <F> F clean(F f) {
        if (getConfig().isClosureCleanerEnabled()) {
            ClosureCleaner.clean(f, getConfig().getClosureCleanerLevel(), true);
        }
        ClosureCleaner.ensureSerializable(f);
        return f;
    }

    @Internal
    public void addOperator(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }

    @Internal
    public ReadableConfig getConfiguration() {
        return new UnmodifiableConfiguration(this.configuration);
    }

    public static StreamExecutionEnvironment getExecutionEnvironment() {
        return getExecutionEnvironment(new Configuration());
    }

    public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
        return (StreamExecutionEnvironment) Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory).map(streamExecutionEnvironmentFactory -> {
            return streamExecutionEnvironmentFactory.createExecutionEnvironment(configuration);
        }).orElseGet(() -> {
            return createLocalEnvironment(configuration);
        });
    }

    public static LocalStreamEnvironment createLocalEnvironment() {
        return createLocalEnvironment(defaultLocalParallelism);
    }

    public static LocalStreamEnvironment createLocalEnvironment(int i) {
        return createLocalEnvironment(i, new Configuration());
    }

    public static LocalStreamEnvironment createLocalEnvironment(int i, Configuration configuration) {
        Configuration configuration2 = new Configuration();
        configuration2.addAll(configuration);
        configuration2.set((ConfigOption<ConfigOption<Integer>>) CoreOptions.DEFAULT_PARALLELISM, (ConfigOption<Integer>) Integer.valueOf(i));
        return createLocalEnvironment(configuration2);
    }

    public static LocalStreamEnvironment createLocalEnvironment(Configuration configuration) {
        if (configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).isPresent()) {
            return new LocalStreamEnvironment(configuration);
        }
        Configuration configuration2 = new Configuration();
        configuration2.addAll(configuration);
        configuration2.set((ConfigOption<ConfigOption<Integer>>) CoreOptions.DEFAULT_PARALLELISM, (ConfigOption<Integer>) Integer.valueOf(defaultLocalParallelism));
        return new LocalStreamEnvironment(configuration2);
    }

    @PublicEvolving
    public static StreamExecutionEnvironment createLocalEnvironmentWithWebUI(Configuration configuration) {
        Preconditions.checkNotNull(configuration, "conf");
        if (!configuration.contains(RestOptions.PORT)) {
            configuration.setInteger(RestOptions.PORT, RestOptions.PORT.defaultValue().intValue());
        }
        return createLocalEnvironment(configuration);
    }

    public static StreamExecutionEnvironment createRemoteEnvironment(String str, int i, String... strArr) {
        return new RemoteStreamEnvironment(str, i, strArr);
    }

    public static StreamExecutionEnvironment createRemoteEnvironment(String str, int i, int i2, String... strArr) {
        RemoteStreamEnvironment remoteStreamEnvironment = new RemoteStreamEnvironment(str, i, strArr);
        remoteStreamEnvironment.setParallelism(i2);
        return remoteStreamEnvironment;
    }

    public static StreamExecutionEnvironment createRemoteEnvironment(String str, int i, Configuration configuration, String... strArr) {
        return new RemoteStreamEnvironment(str, i, configuration, strArr);
    }

    @PublicEvolving
    public static int getDefaultLocalParallelism() {
        return defaultLocalParallelism;
    }

    @PublicEvolving
    public static void setDefaultLocalParallelism(int i) {
        defaultLocalParallelism = i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void initializeContextEnvironment(StreamExecutionEnvironmentFactory streamExecutionEnvironmentFactory) {
        contextEnvironmentFactory = streamExecutionEnvironmentFactory;
        threadLocalContextEnvironmentFactory.set(contextEnvironmentFactory);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void resetContextEnvironment() {
        contextEnvironmentFactory = null;
        threadLocalContextEnvironmentFactory.remove();
    }

    public void registerCachedFile(String str, String str2) {
        registerCachedFile(str, str2, false);
    }

    public void registerCachedFile(String str, String str2, boolean z) {
        this.cacheFile.add(new Tuple2<>(str2, new DistributedCache.DistributedCacheEntry(str, Boolean.valueOf(z))));
    }

    private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(Object obj, String str, Class<?> cls, TypeInformation<OUT> typeInformation) {
        TypeInformation<OUT> typeInformation2 = typeInformation;
        if (typeInformation2 == null && (obj instanceof ResultTypeQueryable)) {
            typeInformation2 = ((ResultTypeQueryable) obj).getProducedType();
        }
        if (typeInformation2 == null) {
            try {
                typeInformation2 = TypeExtractor.createTypeInfo(cls, obj.getClass(), 0, null, null);
            } catch (InvalidTypesException e) {
                typeInformation2 = new MissingTypeInfo(str, e);
            }
        }
        return (T) typeInformation2;
    }
}
