package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateBackend;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/AbstractKeyedStateBackend.class */
public abstract class AbstractKeyedStateBackend<K> implements KeyedStateBackend<K>, Snapshotable<KeyGroupsStateHandle>, Closeable {
    protected final TypeSerializer<K> keySerializer;
    protected K currentKey;
    private int currentKeyGroup;
    protected HashMap<String, KvState<?>> keyValueStatesByName;
    private String lastName;
    private KvState lastState;
    protected final int numberOfKeyGroups;
    protected final KeyGroupRange keyGroupRange;
    protected final TaskKvStateRegistry kvStateRegistry;
    protected CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
    protected final ClassLoader userCodeClassLoader;

    public AbstractKeyedStateBackend(TaskKvStateRegistry taskKvStateRegistry, TypeSerializer<K> typeSerializer, ClassLoader classLoader, int i, KeyGroupRange keyGroupRange) {
        this.kvStateRegistry = taskKvStateRegistry;
        this.keySerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.numberOfKeyGroups = ((Integer) Preconditions.checkNotNull(Integer.valueOf(i))).intValue();
        this.userCodeClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.keyGroupRange = (KeyGroupRange) Preconditions.checkNotNull(keyGroupRange);
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public void dispose() {
        if (this.kvStateRegistry != null) {
            this.kvStateRegistry.unregisterAll();
        }
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName = null;
    }

    protected abstract <N, T> ValueState<T> createValueState(TypeSerializer<N> typeSerializer, ValueStateDescriptor<T> valueStateDescriptor) throws Exception;

    protected abstract <N, T> ListState<T> createListState(TypeSerializer<N> typeSerializer, ListStateDescriptor<T> listStateDescriptor) throws Exception;

    protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> typeSerializer, ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception;

    protected abstract <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> typeSerializer, FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception;

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public void setCurrentKey(K k) {
        this.currentKey = k;
        this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(k, this.numberOfKeyGroups);
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public K getCurrentKey() {
        return this.currentKey;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public int getCurrentKeyGroupIndex() {
        return this.currentKeyGroup;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public int getNumberOfKeyGroups() {
        return this.numberOfKeyGroups;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public <N, S extends State> S getPartitionedState(N n, final TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(n, "Namespace");
        Preconditions.checkNotNull(typeSerializer, "Namespace serializer");
        if (this.keySerializer == null) {
            throw new RuntimeException("State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        }
        if (!stateDescriptor.isSerializerInitialized()) {
            stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
        }
        if (this.keyValueStatesByName == null) {
            this.keyValueStatesByName = new HashMap<>();
        }
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(n);
            return (S) this.lastState;
        }
        KvState<?> kvState = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (kvState != null) {
            this.lastState = kvState;
            this.lastState.setCurrentNamespace(n);
            this.lastName = stateDescriptor.getName();
            return (S) kvState;
        }
        S bind = stateDescriptor.bind(new StateBackend() { // from class: org.apache.flink.runtime.state.AbstractKeyedStateBackend.1
            @Override // org.apache.flink.api.common.state.StateBackend
            public <T> ValueState<T> createValueState(ValueStateDescriptor<T> valueStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createValueState(typeSerializer, valueStateDescriptor);
            }

            @Override // org.apache.flink.api.common.state.StateBackend
            public <T> ListState<T> createListState(ListStateDescriptor<T> listStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createListState(typeSerializer, listStateDescriptor);
            }

            @Override // org.apache.flink.api.common.state.StateBackend
            public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> reducingStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createReducingState(typeSerializer, reducingStateDescriptor);
            }

            @Override // org.apache.flink.api.common.state.StateBackend
            public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> foldingStateDescriptor) throws Exception {
                return AbstractKeyedStateBackend.this.createFoldingState(typeSerializer, foldingStateDescriptor);
            }
        });
        KvState<?> kvState2 = (KvState) bind;
        this.keyValueStatesByName.put(stateDescriptor.getName(), kvState2);
        this.lastName = stateDescriptor.getName();
        this.lastState = kvState2;
        kvState2.setCurrentNamespace(n);
        if (stateDescriptor.isQueryable()) {
            if (this.kvStateRegistry == null) {
                throw new IllegalStateException("State backend has not been initialized for job.");
            }
            this.kvStateRegistry.registerKvState(this.keyGroupRange, stateDescriptor.getQueryableStateName(), kvState2);
        }
        return bind;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateBackend
    public <N, S extends MergingState<?, ?>> void mergePartitionedStates(N n, Collection<N> collection, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        if (stateDescriptor instanceof ReducingStateDescriptor) {
            ReduceFunction reduceFunction = ((ReducingStateDescriptor) stateDescriptor).getReduceFunction();
            ReducingState reducingState = (ReducingState) getPartitionedState(n, typeSerializer, stateDescriptor);
            KvState kvState = (KvState) reducingState;
            Object obj = null;
            Iterator<N> it = collection.iterator();
            while (it.hasNext()) {
                kvState.setCurrentNamespace(it.next());
                T t = reducingState.get();
                if (obj == null) {
                    obj = reducingState.get();
                } else if (t != 0) {
                    obj = reduceFunction.reduce(obj, t);
                }
                reducingState.clear();
            }
            kvState.setCurrentNamespace(n);
            if (obj != null) {
                reducingState.add(obj);
                return;
            }
            return;
        }
        if (!(stateDescriptor instanceof ListStateDescriptor)) {
            throw new RuntimeException("Cannot merge states for " + stateDescriptor);
        }
        ListState listState = (ListState) getPartitionedState(n, typeSerializer, stateDescriptor);
        KvState kvState2 = (KvState) listState;
        ArrayList arrayList = new ArrayList();
        Iterator<N> it2 = collection.iterator();
        while (it2.hasNext()) {
            kvState2.setCurrentNamespace(it2.next());
            Iterable iterable = listState.get();
            if (iterable != null) {
                Iterator it3 = iterable.iterator();
                while (it3.hasNext()) {
                    arrayList.add(it3.next());
                }
            }
            listState.clear();
        }
        kvState2.setCurrentNamespace(n);
        Iterator it4 = arrayList.iterator();
        while (it4.hasNext()) {
            listState.add(it4.next());
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.cancelStreamRegistry.close();
    }
}
