package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.RegisteredOperatorBackendStateMetaInfo;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackend.class */
public class DefaultOperatorStateBackend implements OperatorStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
    private final ClassLoader userClassloader;
    private final ExecutionConfig executionConfig;
    private final boolean asynchronousSnapshots;
    private final CloseableRegistry closeStreamOnCancelRegistry = new CloseableRegistry();
    private final JavaSerializer<Serializable> javaSerializer = new JavaSerializer<>();
    private final Map<String, PartitionableListState<?>> registeredStates = new HashMap();
    private final HashMap<String, PartitionableListState<?>> accessedStatesByName = new HashMap<>();
    private final Map<String, RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredStateMetaInfos = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/DefaultOperatorStateBackend$PartitionableListState.class */
    public static final class PartitionableListState<S> implements ListState<S> {
        private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
        private final ArrayList<S> internalList;
        private final ArrayListSerializer<S> internalListCopySerializer;

        public PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> registeredOperatorBackendStateMetaInfo) {
            this(registeredOperatorBackendStateMetaInfo, new ArrayList());
        }

        private PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> registeredOperatorBackendStateMetaInfo, ArrayList<S> arrayList) {
            this.stateMetaInfo = (RegisteredOperatorBackendStateMetaInfo) Preconditions.checkNotNull(registeredOperatorBackendStateMetaInfo);
            this.internalList = (ArrayList) Preconditions.checkNotNull(arrayList);
            this.internalListCopySerializer = new ArrayListSerializer<>(registeredOperatorBackendStateMetaInfo.getPartitionStateSerializer());
        }

        private PartitionableListState(PartitionableListState<S> partitionableListState) {
            this(partitionableListState.stateMetaInfo, partitionableListState.internalListCopySerializer.copy(partitionableListState.internalList));
        }

        public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> registeredOperatorBackendStateMetaInfo) {
            this.stateMetaInfo = registeredOperatorBackendStateMetaInfo;
        }

        public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
            return this.stateMetaInfo;
        }

        public PartitionableListState<S> deepCopy() {
            return new PartitionableListState<>(this);
        }

        @Override // org.apache.flink.api.common.state.State
        public void clear() {
            this.internalList.clear();
        }

        @Override // org.apache.flink.api.common.state.AppendingState
        public Iterable<S> get() {
            return this.internalList;
        }

        @Override // org.apache.flink.api.common.state.AppendingState
        public void add(S s) {
            this.internalList.add(s);
        }

        public String toString() {
            return "PartitionableListState{stateMetaInfo=" + this.stateMetaInfo + ", internalList=" + this.internalList + '}';
        }

        public long[] write(FSDataOutputStream fSDataOutputStream) throws IOException {
            long[] jArr = new long[this.internalList.size()];
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(fSDataOutputStream);
            for (int i = 0; i < this.internalList.size(); i++) {
                S s = this.internalList.get(i);
                jArr[i] = fSDataOutputStream.getPos();
                getStateMetaInfo().getPartitionStateSerializer().serialize(s, dataOutputViewStreamWrapper);
            }
            return jArr;
        }
    }

    public DefaultOperatorStateBackend(ClassLoader classLoader, ExecutionConfig executionConfig, boolean z) throws IOException {
        this.userClassloader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.executionConfig = executionConfig;
        this.asynchronousSnapshots = z;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public Set<String> getRegisteredStateNames() {
        return this.registeredStates.keySet();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closeStreamOnCancelRegistry.close();
    }

    @Override // org.apache.flink.runtime.state.OperatorStateBackend
    public void dispose() {
        IOUtils.closeQuietly(this);
        this.registeredStates.clear();
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return getListState(listStateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return getListState(listStateDescriptor, OperatorStateHandle.Mode.BROADCAST);
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    @Deprecated
    public <S> ListState<S> getOperatorState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
        return getListState(listStateDescriptor);
    }

    @Override // org.apache.flink.api.common.state.OperatorStateStore
    @Deprecated
    public <T extends Serializable> ListState<T> getSerializableListState(String str) throws Exception {
        return getListState(new ListStateDescriptor(str, this.javaSerializer));
    }

    /* JADX WARN: Finally extract failed */
    @Override // org.apache.flink.runtime.state.Snapshotable
    public RunnableFuture<OperatorStateHandle> snapshot(final long j, final long j2, final CheckpointStreamFactory checkpointStreamFactory, CheckpointOptions checkpointOptions) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.registeredStates.isEmpty()) {
            return DoneFuture.nullValue();
        }
        final HashMap hashMap = new HashMap(this.registeredStates.size());
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.userClassloader);
        try {
            for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
                PartitionableListState<?> value = entry.getValue();
                if (null != value) {
                    value = value.deepCopy();
                }
                hashMap.put(entry.getKey(), value);
            }
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            AsyncStoppableTaskWithCallback from = AsyncStoppableTaskWithCallback.from(new AbstractAsyncCallableWithResources<OperatorStateHandle>() { // from class: org.apache.flink.runtime.state.DefaultOperatorStateBackend.1
                CheckpointStreamFactory.CheckpointStateOutputStream out = null;

                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                protected void acquireResources() throws Exception {
                    openOutStream();
                }

                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                protected void releaseResources() throws Exception {
                    closeOutStream();
                }

                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                protected void stopOperation() throws Exception {
                    closeOutStream();
                }

                private void openOutStream() throws Exception {
                    this.out = checkpointStreamFactory.createCheckpointStateOutputStream(j, j2);
                    DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry.registerCloseable(this.out);
                }

                private void closeOutStream() {
                    if (DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry.unregisterCloseable(this.out)) {
                        IOUtils.closeQuietly(this.out);
                    }
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources
                /* renamed from: performOperation */
                public OperatorStateHandle performOperation2() throws Exception {
                    StreamStateHandle closeAndGetHandle;
                    long currentTimeMillis2 = System.currentTimeMillis();
                    CheckpointStreamFactory.CheckpointStateOutputStream checkpointStateOutputStream = this.out;
                    HashMap hashMap2 = new HashMap(hashMap.size());
                    ArrayList arrayList = new ArrayList(hashMap.size());
                    Iterator it = hashMap.entrySet().iterator();
                    while (it.hasNext()) {
                        arrayList.add(((PartitionableListState) ((Map.Entry) it.next()).getValue()).getStateMetaInfo().snapshot());
                    }
                    DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(checkpointStateOutputStream);
                    new OperatorBackendSerializationProxy(arrayList).write(dataOutputViewStreamWrapper);
                    dataOutputViewStreamWrapper.writeInt(hashMap.size());
                    for (Map.Entry entry2 : hashMap.entrySet()) {
                        PartitionableListState partitionableListState = (PartitionableListState) entry2.getValue();
                        hashMap2.put(entry2.getKey(), new OperatorStateHandle.StateMetaInfo(partitionableListState.write(checkpointStateOutputStream), partitionableListState.getStateMetaInfo().getAssignmentMode()));
                    }
                    OperatorStateHandle operatorStateHandle = null;
                    if (DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry.unregisterCloseable(this.out) && (closeAndGetHandle = this.out.closeAndGetHandle()) != null) {
                        operatorStateHandle = new OperatorStateHandle(hashMap2, closeAndGetHandle);
                    }
                    if (DefaultOperatorStateBackend.this.asynchronousSnapshots) {
                        DefaultOperatorStateBackend.LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{checkpointStreamFactory, Thread.currentThread(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis2)});
                    }
                    return operatorStateHandle;
                }
            });
            if (!this.asynchronousSnapshots) {
                from.run();
            }
            LOG.info("DefaultOperatorStateBackend snapshot ({}, synchronous part) in thread {} took {} ms.", new Object[]{checkpointStreamFactory, Thread.currentThread(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            return from;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    @Override // org.apache.flink.runtime.state.Snapshotable
    public void restore(Collection<OperatorStateHandle> collection) throws Exception {
        if (null == collection) {
            return;
        }
        for (OperatorStateHandle operatorStateHandle : collection) {
            if (operatorStateHandle != null) {
                FSDataInputStream openInputStream = operatorStateHandle.openInputStream();
                this.closeStreamOnCancelRegistry.registerCloseable(openInputStream);
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                try {
                    Thread.currentThread().setContextClassLoader(this.userClassloader);
                    OperatorBackendSerializationProxy operatorBackendSerializationProxy = new OperatorBackendSerializationProxy(this.userClassloader);
                    operatorBackendSerializationProxy.read(new DataInputViewStreamWrapper(openInputStream));
                    for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> snapshot : operatorBackendSerializationProxy.getStateMetaInfoSnapshots()) {
                        if (snapshot.getPartitionStateSerializer() == null || (snapshot.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer)) {
                            throw new IOException("Unable to restore operator state [" + snapshot.getName() + "]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                        }
                        this.restoredStateMetaInfos.put(snapshot.getName(), snapshot);
                        if (null == this.registeredStates.get(snapshot.getName())) {
                            PartitionableListState<?> partitionableListState = new PartitionableListState<>((RegisteredOperatorBackendStateMetaInfo<?>) new RegisteredOperatorBackendStateMetaInfo(snapshot.getName(), snapshot.getPartitionStateSerializer(), snapshot.getAssignmentMode()));
                            this.registeredStates.put(partitionableListState.getStateMetaInfo().getName(), partitionableListState);
                        }
                    }
                    for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                        PartitionableListState<?> partitionableListState2 = this.registeredStates.get(entry.getKey());
                        Preconditions.checkState(null != partitionableListState2, "Found state without corresponding meta info: " + entry.getKey());
                        deserializeStateValues(partitionableListState2, openInputStream, entry.getValue());
                    }
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    if (this.closeStreamOnCancelRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly(openInputStream);
                    }
                } catch (Throwable th) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    if (this.closeStreamOnCancelRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly(openInputStream);
                    }
                    throw th;
                }
            }
        }
    }

    private <S> ListState<S> getListState(ListStateDescriptor<S> listStateDescriptor, OperatorStateHandle.Mode mode) throws IOException, StateMigrationException {
        Preconditions.checkNotNull(listStateDescriptor);
        String str = (String) Preconditions.checkNotNull(listStateDescriptor.getName());
        PartitionableListState<?> partitionableListState = this.accessedStatesByName.get(str);
        if (partitionableListState != null) {
            checkStateNameAndMode(partitionableListState.getStateMetaInfo(), str, mode);
            return partitionableListState;
        }
        listStateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
        TypeSerializer typeSerializer = (TypeSerializer) Preconditions.checkNotNull(listStateDescriptor.getElementSerializer());
        PartitionableListState<?> partitionableListState2 = this.registeredStates.get(str);
        if (null == partitionableListState2) {
            partitionableListState2 = new PartitionableListState<>((RegisteredOperatorBackendStateMetaInfo<?>) new RegisteredOperatorBackendStateMetaInfo(str, typeSerializer, mode));
            this.registeredStates.put(str, partitionableListState2);
        } else {
            checkStateNameAndMode(partitionableListState2.getStateMetaInfo(), str, mode);
            RegisteredOperatorBackendStateMetaInfo.Snapshot<?> snapshot = this.restoredStateMetaInfos.get(str);
            if (CompatibilityUtil.resolveCompatibilityResult(snapshot.getPartitionStateSerializer(), UnloadableDummyTypeSerializer.class, snapshot.getPartitionStateSerializerConfigSnapshot(), typeSerializer).isRequiresMigration()) {
                throw new StateMigrationException("State migration isn't supported, yet.");
            }
            partitionableListState2.setStateMetaInfo(new RegisteredOperatorBackendStateMetaInfo<>(str, typeSerializer, mode));
        }
        this.accessedStatesByName.put(str, partitionableListState2);
        return partitionableListState2;
    }

    private static <S> void deserializeStateValues(PartitionableListState<S> partitionableListState, FSDataInputStream fSDataInputStream, OperatorStateHandle.StateMetaInfo stateMetaInfo) throws IOException {
        long[] offsets;
        if (null == stateMetaInfo || null == (offsets = stateMetaInfo.getOffsets())) {
            return;
        }
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
        TypeSerializer<S> partitionStateSerializer = partitionableListState.getStateMetaInfo().getPartitionStateSerializer();
        for (long j : offsets) {
            fSDataInputStream.seek(j);
            partitionableListState.add(partitionStateSerializer.deserialize(dataInputViewStreamWrapper));
        }
    }

    private static void checkStateNameAndMode(RegisteredOperatorBackendStateMetaInfo registeredOperatorBackendStateMetaInfo, String str, OperatorStateHandle.Mode mode) {
        Preconditions.checkState(registeredOperatorBackendStateMetaInfo.getName().equals(str), "Incompatible state names. Was [" + registeredOperatorBackendStateMetaInfo.getName() + "], registered with [" + str + "].");
        Preconditions.checkState(registeredOperatorBackendStateMetaInfo.getAssignmentMode().equals(mode), "Incompatible state assignment modes. Was [" + registeredOperatorBackendStateMetaInfo.getAssignmentMode() + "], registered with [" + mode + "].");
    }
}
