package org.apache.flink.runtime.asyncprocessing;

import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.class */
public class StateRequestBuffer<K> {
    private static final ScheduledThreadPoolExecutor DELAYER = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("StateRequestBuffer-timeout-scheduler"));
    final long bufferTimeout;
    final Consumer<Long> timeoutHandler;
    ScheduledExecutorService scheduledExecutor;
    ScheduledFuture<Void> currentScheduledFuture;
    final LinkedList<StateRequest<K, ?, ?>> activeQueue = new LinkedList<>();
    final Map<K, Deque<StateRequest<K, ?, ?>>> blockingQueue = new HashMap();
    int blockingQueueSize = 0;
    AtomicLong scheduledSeq = new AtomicLong(-1);
    AtomicLong currentSeq = new AtomicLong(0);

    public StateRequestBuffer(long j, Consumer<Long> consumer) {
        this.bufferTimeout = j;
        this.timeoutHandler = consumer;
        if (j > 0) {
            this.scheduledExecutor = DELAYER;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void advanceSeq() {
        this.currentSeq.incrementAndGet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean checkCurrentSeq(long j) {
        return this.currentSeq.get() == j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueToActive(StateRequest<K, ?, ?> stateRequest) {
        if (stateRequest.getRequestType() == StateRequestType.SYNC_POINT) {
            stateRequest.getFuture().complete(null);
            return;
        }
        this.activeQueue.add(stateRequest);
        if (this.bufferTimeout <= 0 || this.currentSeq.get() <= this.scheduledSeq.get()) {
            return;
        }
        if (this.currentScheduledFuture != null && !this.currentScheduledFuture.isDone() && !this.currentScheduledFuture.isCancelled()) {
            this.currentScheduledFuture.cancel(false);
        }
        long j = this.currentSeq.get();
        this.scheduledSeq.set(j);
        this.currentScheduledFuture = this.scheduledExecutor.schedule(() -> {
            if (j == this.currentSeq.get()) {
                this.timeoutHandler.accept(Long.valueOf(j));
            }
        }, this.bufferTimeout, TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueToBlocking(StateRequest<K, ?, ?> stateRequest) {
        this.blockingQueue.computeIfAbsent(stateRequest.getRecordContext().getKey(), obj -> {
            return new LinkedList();
        }).add(stateRequest);
        this.blockingQueueSize++;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public RecordContext<K> tryActivateOneByKey(K k) {
        if (!this.blockingQueue.containsKey(k)) {
            return null;
        }
        StateRequest<K, ?, ?> removeFirst = this.blockingQueue.get(k).removeFirst();
        enqueueToActive(removeFirst);
        if (this.blockingQueue.get(k).isEmpty()) {
            this.blockingQueue.remove(k);
        }
        this.blockingQueueSize--;
        return removeFirst.getRecordContext();
    }

    int blockingQueueSize() {
        return this.blockingQueueSize;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int activeQueueSize() {
        return this.activeQueue.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<StateRequestContainer> popActive(int i, Supplier<StateRequestContainer> supplier) {
        int min = Math.min(i, this.activeQueue.size());
        if (min <= 0) {
            return Optional.empty();
        }
        StateRequestContainer stateRequestContainer = supplier.get();
        for (int i2 = 0; i2 < min; i2++) {
            stateRequestContainer.offer(this.activeQueue.pop());
        }
        return Optional.of(stateRequestContainer);
    }

    static {
        DELAYER.setRemoveOnCancelPolicy(true);
        DELAYER.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        DELAYER.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    }
}
