package org.apache.flink.streaming.api.operators;

import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.LatencyStats;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.class */
public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OUT>, StreamOperatorStateHandler.CheckpointedStreamOperator {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperatorV2.class);
    protected final StreamConfig config;
    protected final Output<StreamRecord<OUT>> output;
    private final StreamingRuntimeContext runtimeContext;
    private final ExecutionConfig executionConfig;
    private final ClassLoader userCodeClassLoader;
    private final CloseableRegistry cancelables;
    private final long[] inputWatermarks;
    protected final OperatorMetricGroup metrics;
    protected final LatencyStats latencyStats;
    protected final ProcessingTimeService processingTimeService;
    private StreamOperatorStateHandler stateHandler;
    private InternalTimeServiceManager<?> timeServiceManager;
    private long combinedWatermark = Long.MIN_VALUE;

    public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> streamOperatorParameters, int i) {
        CountingOutput countingOutput;
        OperatorMetricGroup operatorMetricGroup;
        this.inputWatermarks = new long[i];
        Arrays.fill(this.inputWatermarks, Long.MIN_VALUE);
        Environment environment = streamOperatorParameters.getContainingTask().getEnvironment();
        this.config = streamOperatorParameters.getStreamConfig();
        try {
            operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(this.config.getOperatorID(), this.config.getOperatorName());
            countingOutput = new CountingOutput(streamOperatorParameters.getOutput(), operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
            if (this.config.isChainEnd()) {
                operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
            }
        } catch (Exception e) {
            LOG.warn("An error occurred while instantiating task metrics.", e);
            countingOutput = null;
            operatorMetricGroup = null;
        }
        if (countingOutput == null || operatorMetricGroup == null) {
            this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
            this.output = streamOperatorParameters.getOutput();
        } else {
            this.metrics = operatorMetricGroup;
            this.output = countingOutput;
        }
        this.latencyStats = createLatencyStats(environment.getTaskManagerInfo().getConfiguration(), streamOperatorParameters.getContainingTask().getIndexInSubtaskGroup());
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(streamOperatorParameters.getProcessingTimeService());
        this.executionConfig = streamOperatorParameters.getContainingTask().getExecutionConfig();
        this.userCodeClassLoader = streamOperatorParameters.getContainingTask().getUserCodeClassLoader();
        this.cancelables = streamOperatorParameters.getContainingTask().getCancelables();
        this.runtimeContext = new StreamingRuntimeContext(environment, environment.getAccumulatorRegistry().getUserMap(), operatorMetricGroup, getOperatorID(), this.processingTimeService, null, environment.getExternalResourceInfoProvider());
    }

    private LatencyStats createLatencyStats(Configuration configuration, int i) {
        LatencyStats.Granularity granularity;
        try {
            int integer = configuration.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
            if (integer <= 0) {
                LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, Integer.valueOf(integer));
                integer = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue().intValue();
            }
            String string = configuration.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
            try {
                granularity = LatencyStats.Granularity.valueOf(string.toUpperCase(Locale.ROOT));
            } catch (IllegalArgumentException e) {
                granularity = LatencyStats.Granularity.OPERATOR;
                LOG.warn("Configured value {} option for {} is invalid. Defaulting to {}.", new Object[]{string, MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), granularity});
            }
            return new LatencyStats(this.metrics.parent().parent().addGroup("latency"), integer, i, getOperatorID(), granularity);
        } catch (Exception e2) {
            LOG.warn("An error occurred while instantiating latency metrics.", e2);
            return new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), 1, 0, new OperatorID(), LatencyStats.Granularity.SINGLE);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public MetricGroup getMetricGroup() {
        return this.metrics;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public final void initializeState(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        StreamOperatorStateContext streamOperatorStateContext = streamTaskStateInitializer.streamOperatorStateContext(getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, this.config.getStateKeySerializer(getUserCodeClassloader()), this.cancelables, this.metrics, this.config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND, this.runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), this.runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState());
        this.stateHandler = new StreamOperatorStateHandler(streamOperatorStateContext, getExecutionConfig(), this.cancelables);
        this.timeServiceManager = streamOperatorStateContext.internalTimerServiceManager();
        this.stateHandler.initializeOperatorState(this);
    }

    @Internal
    protected boolean isUsingCustomRawKeyedState() {
        return false;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator, org.apache.flink.util.Disposable
    public void dispose() throws Exception {
        if (this.stateHandler != null) {
            this.stateHandler.dispose();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void prepareSnapshotPreBarrier(long j) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public final OperatorSnapshotFutures snapshotState(long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        return this.stateHandler.snapshotState(this, Optional.ofNullable(this.timeServiceManager), getOperatorName(), j, j2, checkpointOptions, checkpointStreamFactory, isUsingCustomRawKeyedState());
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
        this.stateHandler.notifyCheckpointComplete(j);
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointAborted(long j) throws Exception {
        this.stateHandler.notifyCheckpointAborted(j);
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public StreamConfig getOperatorConfig() {
        return this.config;
    }

    public ClassLoader getUserCodeClassloader() {
        return this.userCodeClassLoader;
    }

    protected String getOperatorName() {
        return this.runtimeContext != null ? this.runtimeContext.getTaskNameWithSubtasks() : getClass().getSimpleName();
    }

    public StreamingRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    @VisibleForTesting
    public <K> KeyedStateBackend<K> getKeyedStateBackend() {
        return this.stateHandler.getKeyedStateBackend();
    }

    @VisibleForTesting
    public OperatorStateBackend getOperatorStateBackend() {
        return this.stateHandler.getOperatorStateBackend();
    }

    @VisibleForTesting
    public ProcessingTimeService getProcessingTimeService() {
        return this.processingTimeService;
    }

    protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S) getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
    }

    protected <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        return (S) this.stateHandler.getOrCreateKeyedState(typeSerializer, stateDescriptor);
    }

    protected <S extends State, N> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S) this.stateHandler.getPartitionedState(n, typeSerializer, stateDescriptor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> void internalSetKeyContextElement(StreamRecord<T> streamRecord, KeySelector<T, ?> keySelector) throws Exception {
        if (keySelector != null) {
            setCurrentKey(keySelector.getKey(streamRecord.getValue()));
        }
    }

    @Override // org.apache.flink.streaming.api.operators.KeyContext
    public void setCurrentKey(Object obj) {
        this.stateHandler.setCurrentKey(obj);
    }

    @Override // org.apache.flink.streaming.api.operators.KeyContext
    public Object getCurrentKey() {
        return this.stateHandler.getCurrentKey();
    }

    public Optional<KeyedStateStore> getKeyedStateStore() {
        return this.stateHandler == null ? Optional.empty() : this.stateHandler.getKeyedStateStore();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reportOrForwardLatencyMarker(LatencyMarker latencyMarker) {
        this.latencyStats.reportLatency(latencyMarker);
        this.output.emitLatencyMarker(latencyMarker);
    }

    @VisibleForTesting
    public <K, N> InternalTimerService<N> getInternalTimerService(String str, TypeSerializer<N> typeSerializer, Triggerable<K, N> triggerable) {
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
        InternalTimeServiceManager<?> internalTimeServiceManager = this.timeServiceManager;
        KeyedStateBackend<K> keyedStateBackend = getKeyedStateBackend();
        Preconditions.checkState(keyedStateBackend != null, "Timers can only be used on keyed operators.");
        return internalTimeServiceManager.getInternalTimerService(str, keyedStateBackend.getKeySerializer(), typeSerializer, triggerable);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        if (this.timeServiceManager != null) {
            this.timeServiceManager.advanceWatermark(watermark);
        }
        this.output.emitWatermark(watermark);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reportWatermark(Watermark watermark, int i) throws Exception {
        this.inputWatermarks[i - 1] = watermark.getTimestamp();
        long timestamp = watermark.getTimestamp();
        for (long j : this.inputWatermarks) {
            timestamp = Math.min(j, timestamp);
        }
        if (timestamp > this.combinedWatermark) {
            this.combinedWatermark = timestamp;
            processWatermark(new Watermark(this.combinedWatermark));
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public OperatorID getOperatorID() {
        return this.config.getOperatorID();
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void setKeyContextElement1(StreamRecord<?> streamRecord) throws Exception {
        throw new IllegalStateException("This method should never be called. Use Input class instead");
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void setKeyContextElement2(StreamRecord<?> streamRecord) throws Exception {
        throw new IllegalStateException("This method should never be called. Use Input class instead");
    }

    protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
        return Optional.ofNullable(this.timeServiceManager);
    }
}
