package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingRunnable;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImpl.class */
public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImpl<K, N> {
    private AsyncExecutionController<K> asyncExecutionController;

    /* JADX INFO: Access modifiers changed from: package-private */
    public InternalTimerServiceAsyncImpl(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> keyGroupedInternalPriorityQueue, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> keyGroupedInternalPriorityQueue2, StreamTaskCancellationContext streamTaskCancellationContext, AsyncExecutionController<K> asyncExecutionController) {
        super(taskIOMetricGroup, keyGroupRange, keyContext, processingTimeService, keyGroupedInternalPriorityQueue, keyGroupedInternalPriorityQueue2, streamTaskCancellationContext);
        this.asyncExecutionController = asyncExecutionController;
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
    void onProcessingTime(long j) throws Exception {
        TimerHeapInternalTimer<K, N> peek;
        this.nextTimer = null;
        while (true) {
            peek = this.processingTimeTimersQueue.peek();
            if (peek == null || peek.getTimestamp() > j || this.cancellationContext.isCancelled()) {
                break;
            }
            this.processingTimeTimersQueue.poll();
            maintainContextAndProcess(peek, () -> {
                this.triggerTarget.onProcessingTime(peek);
            });
            this.taskIOMetricGroup.getNumFiredTimers().inc();
        }
        if (peek == null || this.nextTimer != null) {
            return;
        }
        this.nextTimer = this.processingTimeService.registerTimer(peek.getTimestamp(), this::onProcessingTime);
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
    public void advanceWatermark(long j) throws Exception {
        this.currentWatermark = j;
        while (true) {
            TimerHeapInternalTimer<K, N> peek = this.eventTimeTimersQueue.peek();
            if (peek == null || peek.getTimestamp() > j || this.cancellationContext.isCancelled()) {
                return;
            }
            this.eventTimeTimersQueue.poll();
            maintainContextAndProcess(peek, () -> {
                this.triggerTarget.onEventTime(peek);
            });
            this.taskIOMetricGroup.getNumFiredTimers().inc();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.InternalTimerServiceImpl
    protected void foreachTimer(BiConsumerWithException<N, Long, Exception> biConsumerWithException, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> keyGroupedInternalPriorityQueue) throws Exception {
        throw new UnsupportedOperationException("Batch operation is not supported when using async state.");
    }

    private void maintainContextAndProcess(InternalTimer<K, N> internalTimer, ThrowingRunnable<Exception> throwingRunnable) {
        RecordContext<K> buildContext = this.asyncExecutionController.buildContext(null, internalTimer.getKey());
        buildContext.retain();
        this.asyncExecutionController.setCurrentContext(buildContext);
        this.keyContext.setCurrentKey(internalTimer.getKey());
        this.asyncExecutionController.syncPointRequestWithCallback(throwingRunnable);
        buildContext.release();
    }
}
