package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateBinder;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/AbstractKeyedStateBackend.class */
public abstract class AbstractKeyedStateBackend<K> implements KeyedStateBackend<K>, Snapshotable<KeyedStateHandle>, Closeable, CheckpointListener {
    protected final TypeSerializer<K> keySerializer;
    protected K currentKey;
    private int currentKeyGroup;
    private String lastName;
    private InternalKvState lastState;
    protected final int numberOfKeyGroups;
    protected final KeyGroupRange keyGroupRange;
    protected final TaskKvStateRegistry kvStateRegistry;
    protected final ClassLoader userCodeClassLoader;
    private final ExecutionConfig executionConfig;
    protected CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
    protected final HashMap<String, InternalKvState<?>> keyValueStatesByName = new HashMap<>();

    public AbstractKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, int i, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig) {
        this.kvStateRegistry = taskKvStateRegistry;
        this.keySerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.numberOfKeyGroups = ((Integer) Preconditions.checkNotNull(Integer.valueOf(i))).intValue();
        this.userCodeClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.keyGroupRange = (KeyGroupRange) Preconditions.checkNotNull(keyGroupRange);
        this.executionConfig = executionConfig;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public void dispose() {
        IOUtils.closeQuietly(this);
        if (this.kvStateRegistry != null) {
            this.kvStateRegistry.unregisterAll();
        }
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
    }

    protected abstract <N, T> InternalValueState<N, T> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<T> valueStateDescriptor) throws Exception;

    protected abstract <N, T> InternalListState<N, T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception;

    protected abstract <N, T> InternalReducingState<N, T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception;

    protected abstract <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(TypeSerializer<N> typeSerializer, AggregatingStateDescriptor<T, ACC, R> aggregatingStateDescriptor) throws Exception;

    @Deprecated
    protected abstract <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception;

    protected abstract <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> typeSerializer, MapStateDescriptor<UK, UV> mapStateDescriptor) throws Exception;

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public void setCurrentKey(K k) {
        this.currentKey = k;
        this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(k, this.numberOfKeyGroups);
    }

    @Override // org.apache.flink.runtime.state.heap.InternalKeyContext
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override // org.apache.flink.runtime.state.heap.InternalKeyContext
    public K getCurrentKey() {
        return this.currentKey;
    }

    @Override // org.apache.flink.runtime.state.heap.InternalKeyContext
    public int getCurrentKeyGroupIndex() {
        return this.currentKeyGroup;
    }

    @Override // org.apache.flink.runtime.state.heap.InternalKeyContext
    public int getNumberOfKeyGroups() {
        return this.numberOfKeyGroups;
    }

    @Override // org.apache.flink.runtime.state.heap.InternalKeyContext
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public <N, S extends State, V> S getOrCreateKeyedState(final TypeSerializer<N> typeSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(typeSerializer, "Namespace serializer");
        if (this.keySerializer == null) {
            throw new UnsupportedOperationException("State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        }
        if (!stateDescriptor.isSerializerInitialized()) {
            stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
        }
        InternalKvState<?> internalKvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (internalKvState != null) {
            return internalKvState;
        }
        S bind = stateDescriptor.bind(new StateBinder() { // from class: org.apache.flink.runtime.state.AbstractKeyedStateBackend.1
            @Override // org.apache.flink.api.common.state.StateBinder
            public <T> ValueState<T> createValueState(ValueStateDescriptor<T> valueStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createValueState(typeSerializer, valueStateDescriptor);
            }

            @Override // org.apache.flink.api.common.state.StateBinder
            public <T> ListState<T> createListState(ListStateDescriptor<T> listStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createListState(typeSerializer, listStateDescriptor);
            }

            @Override // org.apache.flink.api.common.state.StateBinder
            public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createReducingState(typeSerializer, reducingStateDescriptor);
            }

            @Override // org.apache.flink.api.common.state.StateBinder
            public <T, ACC, R> AggregatingState<T, R> createAggregatingState(AggregatingStateDescriptor<T, ACC, R> aggregatingStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createAggregatingState(typeSerializer, aggregatingStateDescriptor);
            }

            @Override // org.apache.flink.api.common.state.StateBinder
            public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createFoldingState(typeSerializer, foldingStateDescriptor);
            }

            @Override // org.apache.flink.api.common.state.StateBinder
            public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> mapStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createMapState(typeSerializer, mapStateDescriptor);
            }
        });
        InternalKvState<?> internalKvState2 = (InternalKvState) bind;
        this.keyValueStatesByName.put(stateDescriptor.getName(), internalKvState2);
        if (stateDescriptor.isQueryable()) {
            if (this.kvStateRegistry == null) {
                throw new IllegalStateException("State backend has not been initialized for job.");
            }
            this.kvStateRegistry.registerKvState(this.keyGroupRange, stateDescriptor.getQueryableStateName(), internalKvState2);
        }
        return bind;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public <N, S extends State> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(n, "Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(n);
            return this.lastState;
        }
        InternalKvState<?> internalKvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (internalKvState != null) {
            this.lastState = internalKvState;
            this.lastState.setCurrentNamespace(n);
            this.lastName = stateDescriptor.getName();
            return internalKvState;
        }
        InternalKvState internalKvState2 = (S) getOrCreateKeyedState(typeSerializer, stateDescriptor);
        InternalKvState internalKvState3 = internalKvState2;
        this.lastName = stateDescriptor.getName();
        this.lastState = internalKvState3;
        internalKvState3.setCurrentNamespace(n);
        return internalKvState2;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.cancelStreamRegistry.close();
    }

    @VisibleForTesting
    public boolean supportsAsynchronousSnapshots() {
        return false;
    }
}
