package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.InternalCheckpointListener;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateFactory;
import org.apache.flink.runtime.state.ttl.TtlStateFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/AbstractKeyedStateBackend.class */
public abstract class AbstractKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K>, InternalCheckpointListener, TestableKeyedStateBackend<K>, InternalKeyContext<K> {
    protected final TypeSerializer<K> keySerializer;
    private final ArrayList<KeyedStateBackend.KeySelectionListener<K>> keySelectionListeners;
    private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
    private String lastName;
    private InternalKvState lastState;
    protected final int numberOfKeyGroups;
    protected final KeyGroupRange keyGroupRange;
    protected final TaskKvStateRegistry kvStateRegistry;
    protected CloseableRegistry cancelStreamRegistry;
    protected final ClassLoader userCodeClassLoader;
    private final ExecutionConfig executionConfig;
    protected final TtlTimeProvider ttlTimeProvider;
    protected final LatencyTrackingStateConfig latencyTrackingStateConfig;
    protected final StreamCompressionDecorator keyGroupCompressionDecorator;
    protected final InternalKeyContext<K> keyContext;

    /* loaded from: input_file:org/apache/flink/runtime/state/AbstractKeyedStateBackend$PartitionStateFactory.class */
    public interface PartitionStateFactory {
        <N, S extends State> S get(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception;
    }

    public AbstractKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry closeableRegistry, InternalKeyContext<K> internalKeyContext) {
        this(taskKvStateRegistry, typeSerializer, classLoader, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, closeableRegistry, determineStreamCompression(executionConfig), internalKeyContext);
    }

    public AbstractKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry closeableRegistry, StreamCompressionDecorator streamCompressionDecorator, InternalKeyContext<K> internalKeyContext) {
        this(taskKvStateRegistry, typeSerializer, classLoader, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, closeableRegistry, streamCompressionDecorator, (InternalKeyContext) Preconditions.checkNotNull(internalKeyContext), internalKeyContext.getNumberOfKeyGroups(), internalKeyContext.getKeyGroupRange(), new HashMap(), new ArrayList(1), null, null);
    }

    protected AbstractKeyedStateBackend(AbstractKeyedStateBackend<K> abstractKeyedStateBackend) {
        this(abstractKeyedStateBackend.kvStateRegistry, abstractKeyedStateBackend.keySerializer, abstractKeyedStateBackend.userCodeClassLoader, abstractKeyedStateBackend.executionConfig, abstractKeyedStateBackend.ttlTimeProvider, abstractKeyedStateBackend.latencyTrackingStateConfig, abstractKeyedStateBackend.cancelStreamRegistry, abstractKeyedStateBackend.keyGroupCompressionDecorator, abstractKeyedStateBackend.keyContext, abstractKeyedStateBackend.numberOfKeyGroups, abstractKeyedStateBackend.keyGroupRange, abstractKeyedStateBackend.keyValueStatesByName, abstractKeyedStateBackend.keySelectionListeners, abstractKeyedStateBackend.lastState, abstractKeyedStateBackend.lastName);
    }

    private AbstractKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, CloseableRegistry closeableRegistry, StreamCompressionDecorator streamCompressionDecorator, InternalKeyContext<K> internalKeyContext, int i, KeyGroupRange keyGroupRange, HashMap<String, InternalKvState<K, ?, ?>> hashMap, ArrayList<KeyedStateBackend.KeySelectionListener<K>> arrayList, InternalKvState internalKvState, String str) {
        this.keyContext = (InternalKeyContext) Preconditions.checkNotNull(internalKeyContext);
        this.numberOfKeyGroups = i;
        this.keyGroupRange = (KeyGroupRange) Preconditions.checkNotNull(keyGroupRange);
        Preconditions.checkArgument(i >= 1, "NumberOfKeyGroups must be a positive number");
        Preconditions.checkArgument(i >= keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be at least the number in the key group range assigned to this backend. The total number of key groups: %s, the number in key groups in range: %s", Integer.valueOf(i), Integer.valueOf(keyGroupRange.getNumberOfKeyGroups()));
        this.kvStateRegistry = taskKvStateRegistry;
        this.keySerializer = typeSerializer;
        this.userCodeClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.cancelStreamRegistry = closeableRegistry;
        this.keyValueStatesByName = hashMap;
        this.executionConfig = executionConfig;
        this.keyGroupCompressionDecorator = streamCompressionDecorator;
        this.ttlTimeProvider = (TtlTimeProvider) Preconditions.checkNotNull(ttlTimeProvider);
        this.latencyTrackingStateConfig = (LatencyTrackingStateConfig) Preconditions.checkNotNull(latencyTrackingStateConfig);
        this.keySelectionListeners = arrayList;
        this.lastState = internalKvState;
        this.lastName = str;
    }

    private static StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
        return (executionConfig == null || !executionConfig.isUseSnapshotCompression()) ? UncompressedStreamCompressionDecorator.INSTANCE : SnappyStreamCompressionDecorator.INSTANCE;
    }

    @Override // org.apache.flink.api.common.state.InternalCheckpointListener
    public void notifyCheckpointSubsumed(long j) throws Exception {
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend, org.apache.flink.util.Disposable
    public void dispose() {
        IOUtils.closeQuietly(this.cancelStreamRegistry);
        if (this.kvStateRegistry != null) {
            this.kvStateRegistry.unregisterAll();
        }
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend, org.apache.flink.runtime.state.heap.InternalKeyContext
    public void setCurrentKey(K k) {
        notifyKeySelected(k);
        this.keyContext.setCurrentKey(k);
        this.keyContext.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(k, this.numberOfKeyGroups));
    }

    private void notifyKeySelected(K k) {
        for (int i = 0; i < this.keySelectionListeners.size(); i++) {
            this.keySelectionListeners.get(i).keySelected(k);
        }
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> keySelectionListener) {
        this.keySelectionListeners.add(keySelectionListener);
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> keySelectionListener) {
        return this.keySelectionListeners.remove(keySelectionListener);
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend, org.apache.flink.runtime.state.heap.InternalKeyContext
    public K getCurrentKey() {
        return this.keyContext.getCurrentKey();
    }

    @Override // org.apache.flink.runtime.state.heap.InternalKeyContext
    public int getCurrentKeyGroupIndex() {
        return this.keyContext.getCurrentKeyGroupIndex();
    }

    @Override // org.apache.flink.runtime.state.heap.InternalKeyContext
    public int getNumberOfKeyGroups() {
        return this.numberOfKeyGroups;
    }

    @Override // org.apache.flink.runtime.state.CheckpointableKeyedStateBackend, org.apache.flink.runtime.state.heap.InternalKeyContext
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public <N, S extends State, T> void applyToAllKeys(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> keyedStateFunction) throws Exception {
        applyToAllKeys(n, typeSerializer, stateDescriptor, keyedStateFunction, this::getPartitionedState);
    }

    public <N, S extends State, T> void applyToAllKeys(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> keyedStateFunction, PartitionStateFactory partitionStateFactory) throws Exception {
        Stream<K> keys = getKeys(stateDescriptor.getName(), n);
        Throwable th = null;
        try {
            try {
                State state = partitionStateFactory.get(n, typeSerializer, stateDescriptor);
                keys.forEach(obj -> {
                    setCurrentKey(obj);
                    try {
                        keyedStateFunction.process(obj, state);
                    } catch (Throwable th2) {
                        throw new RuntimeException(th2);
                    }
                });
                if (keys != null) {
                    if (0 == 0) {
                        keys.close();
                        return;
                    }
                    try {
                        keys.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (keys != null) {
                if (th != null) {
                    try {
                        keys.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    keys.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public <N, S extends State, V> S getOrCreateKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(typeSerializer, "Namespace serializer");
        Preconditions.checkNotNull(this.keySerializer, "State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        InternalKvState<?, ?, ?> internalKvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (internalKvState == null) {
            if (!stateDescriptor.isSerializerInitialized()) {
                stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
            }
            internalKvState = LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled((InternalKvState) TtlStateFactory.createStateAndWrapWithTtlIfEnabled(typeSerializer, stateDescriptor, this, this.ttlTimeProvider), stateDescriptor, this.latencyTrackingStateConfig);
            this.keyValueStatesByName.put(stateDescriptor.getName(), internalKvState);
            publishQueryableStateIfEnabled(stateDescriptor, internalKvState);
        }
        return internalKvState;
    }

    public void publishQueryableStateIfEnabled(StateDescriptor<?, ?> stateDescriptor, InternalKvState<?, ?, ?> internalKvState) {
        if (stateDescriptor.isQueryable()) {
            if (this.kvStateRegistry == null) {
                throw new IllegalStateException("State backend has not been initialized for job.");
            }
            this.kvStateRegistry.registerKvState(this.keyGroupRange, stateDescriptor.getQueryableStateName(), internalKvState, this.userCodeClassLoader);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public <N, S extends State> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(n, "Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(n);
            return this.lastState;
        }
        InternalKvState<K, ?, ?> internalKvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (internalKvState != null) {
            this.lastState = internalKvState;
            this.lastState.setCurrentNamespace(n);
            this.lastName = stateDescriptor.getName();
            return internalKvState;
        }
        InternalKvState internalKvState2 = (S) getOrCreateKeyedState(typeSerializer, stateDescriptor);
        InternalKvState internalKvState3 = internalKvState2;
        this.lastName = stateDescriptor.getName();
        this.lastState = internalKvState3;
        internalKvState3.setCurrentNamespace(n);
        return internalKvState2;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.cancelStreamRegistry.close();
    }

    public LatencyTrackingStateConfig getLatencyTrackingStateConfig() {
        return this.latencyTrackingStateConfig;
    }

    @VisibleForTesting
    public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
        return this.keyGroupCompressionDecorator;
    }

    @VisibleForTesting
    public int numKeyValueStatesByName() {
        return this.keyValueStatesByName.size();
    }

    public boolean requiresLegacySynchronousTimerSnapshots(SnapshotType snapshotType) {
        return false;
    }

    public InternalKeyContext<K> getKeyContext() {
        return this.keyContext;
    }

    @Override // org.apache.flink.runtime.state.heap.InternalKeyContext
    public void setCurrentKeyGroupIndex(int i) {
        this.keyContext.setCurrentKeyGroupIndex(i);
    }
}
