package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/MailboxWatermarkProcessor.class */
public class MailboxWatermarkProcessor<OUT> {
    protected static final Logger LOG = LoggerFactory.getLogger(MailboxWatermarkProcessor.class);
    private final Output<StreamRecord<OUT>> output;
    private final MailboxExecutor mailboxExecutor;
    private final InternalTimeServiceManager<?> internalTimeServiceManager;
    private boolean progressWatermarkScheduled = false;
    private Watermark maxInputWatermark = Watermark.UNINITIALIZED;

    public MailboxWatermarkProcessor(Output<StreamRecord<OUT>> output, MailboxExecutor mailboxExecutor, InternalTimeServiceManager<?> internalTimeServiceManager) {
        this.output = (Output) Preconditions.checkNotNull(output);
        this.mailboxExecutor = (MailboxExecutor) Preconditions.checkNotNull(mailboxExecutor);
        this.internalTimeServiceManager = (InternalTimeServiceManager) Preconditions.checkNotNull(internalTimeServiceManager);
    }

    public void emitWatermarkInsideMailbox(Watermark watermark) throws Exception {
        this.maxInputWatermark = new Watermark(Math.max(this.maxInputWatermark.getTimestamp(), watermark.getTimestamp()));
        emitWatermarkInsideMailbox();
    }

    private void emitWatermarkInsideMailbox() throws Exception {
        InternalTimeServiceManager<?> internalTimeServiceManager = this.internalTimeServiceManager;
        Watermark watermark = this.maxInputWatermark;
        MailboxExecutor mailboxExecutor = this.mailboxExecutor;
        mailboxExecutor.getClass();
        if (internalTimeServiceManager.tryAdvanceWatermark(watermark, mailboxExecutor::shouldInterrupt)) {
            this.output.emitWatermark(this.maxInputWatermark);
        } else if (this.progressWatermarkScheduled) {
            LOG.debug("emitWatermarkInsideMailbox is already scheduled, skipping.");
        } else {
            this.progressWatermarkScheduled = true;
            this.mailboxExecutor.execute(MailboxExecutor.MailOptions.deferrable(), () -> {
                this.progressWatermarkScheduled = false;
                emitWatermarkInsideMailbox();
            }, "emitWatermarkInsideMailbox");
        }
    }
}
