package org.apache.flink.runtime.state.changelog;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl.class */
public final class StateChangelogHandleStreamImpl implements StateChangelogHandle<StateChangeStreamReader> {
    private static final long serialVersionUID = -8070326169926626355L;
    private final KeyGroupRange keyGroupRange;
    private final List<Tuple2<StreamStateHandle, Long>> handlesAndOffsets;
    private transient SharedStateRegistry stateRegistry;

    /* loaded from: input_file:org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamImpl$StateChangeStreamReader.class */
    public interface StateChangeStreamReader {
        CloseableIterator<StateChange> read(StreamStateHandle streamStateHandle, long j) throws IOException;
    }

    public StateChangelogHandleStreamImpl(List<Tuple2<StreamStateHandle, Long>> list, KeyGroupRange keyGroupRange) {
        this.handlesAndOffsets = list;
        this.keyGroupRange = keyGroupRange;
    }

    @Override // org.apache.flink.runtime.state.CompositeStateHandle
    public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
        this.stateRegistry = sharedStateRegistry;
        this.handlesAndOffsets.forEach(tuple2 -> {
            sharedStateRegistry.registerReference(getKey((StreamStateHandle) tuple2.f0), (StreamStateHandle) tuple2.f0);
        });
    }

    @Override // org.apache.flink.runtime.state.KeyedStateHandle
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override // org.apache.flink.runtime.state.KeyedStateHandle
    @Nullable
    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
        KeyGroupRange intersection = keyGroupRange.getIntersection(keyGroupRange);
        if (intersection.getNumberOfKeyGroups() == 0) {
            return null;
        }
        return new StateChangelogHandleStreamImpl(this.handlesAndOffsets, intersection);
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogHandle
    public CloseableIterator<StateChange> getChanges(final StateChangeStreamReader stateChangeStreamReader) {
        return new CloseableIterator<StateChange>() { // from class: org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamImpl.1
            private final Iterator<Tuple2<StreamStateHandle, Long>> handleIterator;
            private CloseableIterator<StateChange> current = CloseableIterator.empty();

            {
                this.handleIterator = StateChangelogHandleStreamImpl.this.handlesAndOffsets.iterator();
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                advance();
                return this.current.hasNext();
            }

            @Override // java.util.Iterator
            public StateChange next() {
                advance();
                return this.current.next();
            }

            private void advance() {
                while (!this.current.hasNext() && this.handleIterator.hasNext()) {
                    Tuple2<StreamStateHandle, Long> next = this.handleIterator.next();
                    try {
                        this.current = stateChangeStreamReader.read(next.f0, next.f1.longValue());
                    } catch (IOException e) {
                        ExceptionUtils.rethrow(e);
                    }
                }
            }

            @Override // java.lang.AutoCloseable
            public void close() throws Exception {
                this.current.close();
            }
        };
    }

    @Override // org.apache.flink.runtime.state.StateObject
    public void discardState() {
        this.handlesAndOffsets.forEach(tuple2 -> {
            this.stateRegistry.unregisterReference(getKey((StreamStateHandle) tuple2.f0));
        });
    }

    @Override // org.apache.flink.runtime.state.StateObject
    public long getStateSize() {
        return 0L;
    }

    private static SharedStateRegistryKey getKey(StreamStateHandle streamStateHandle) {
        return streamStateHandle instanceof FileStateHandle ? new SharedStateRegistryKey(((FileStateHandle) streamStateHandle).getFilePath().toString()) : streamStateHandle instanceof ByteStreamStateHandle ? new SharedStateRegistryKey(((ByteStreamStateHandle) streamStateHandle).getHandleName()) : new SharedStateRegistryKey(Integer.toString(System.identityHashCode(streamStateHandle)));
    }
}
