package org.apache.flink.runtime.asyncprocessing;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.state.v2.StateIterator;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureUtils;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AbstractStateIterator.class */
public abstract class AbstractStateIterator<T> implements StateIterator<T> {
    final State originalState;
    final StateRequestType requestType;
    final StateRequestHandler stateHandler;
    final Collection<T> cache;

    public AbstractStateIterator(State state, StateRequestType stateRequestType, StateRequestHandler stateRequestHandler, Collection<T> collection) {
        this.originalState = state;
        this.requestType = stateRequestType;
        this.stateHandler = stateRequestHandler;
        this.cache = collection;
    }

    protected abstract boolean hasNext();

    protected abstract Object nextPayloadForContinuousLoading();

    protected StateRequestType getRequestType() {
        return this.requestType;
    }

    private InternalStateFuture<StateIterator<T>> asyncNextLoad() {
        return this.stateHandler.handleRequest(this.originalState, StateRequestType.ITERATOR_LOADING, nextPayloadForContinuousLoading());
    }

    @Override // org.apache.flink.api.common.state.v2.StateIterator
    public <U> StateFuture<Collection<U>> onNext(Function<T, StateFuture<? extends U>> function) {
        if (isEmpty()) {
            return StateFutureUtils.completedFuture(Collections.emptyList());
        }
        ArrayList arrayList = new ArrayList();
        Iterator<T> it = this.cache.iterator();
        while (it.hasNext()) {
            arrayList.add(function.apply(it.next()));
        }
        return hasNext() ? StateFutureUtils.combineAll(arrayList).thenCombine(asyncNextLoad().thenCompose(stateIterator -> {
            return stateIterator.onNext(function);
        }), (collection, collection2) -> {
            ArrayList arrayList2 = new ArrayList(collection.size() + collection2.size());
            arrayList2.addAll(collection);
            arrayList2.addAll(collection2);
            return arrayList2;
        }) : StateFutureUtils.combineAll(arrayList);
    }

    @Override // org.apache.flink.api.common.state.v2.StateIterator
    public StateFuture<Void> onNext(Consumer<T> consumer) {
        if (isEmpty()) {
            return StateFutureUtils.completedVoidFuture();
        }
        Iterator<T> it = this.cache.iterator();
        while (it.hasNext()) {
            consumer.accept(it.next());
        }
        return hasNext() ? asyncNextLoad().thenCompose(stateIterator -> {
            return stateIterator.onNext(consumer);
        }) : StateFutureUtils.completedVoidFuture();
    }

    @Override // org.apache.flink.api.common.state.v2.StateIterator
    public boolean isEmpty() {
        return (this.cache == null || this.cache.isEmpty()) && !hasNext();
    }
}
