package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.tukaani.xz.common.Util;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.class */
public class TimestampsAndPeriodicWatermarksOperator<T> extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>> implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
    private static final long serialVersionUID = 1;
    private transient long watermarkInterval;
    private transient long currentWatermark;

    public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assignerWithPeriodicWatermarks) {
        super(assignerWithPeriodicWatermarks);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.currentWatermark = Long.MIN_VALUE;
        this.watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        if (this.watermarkInterval > 0) {
            getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        this.output.collect(streamRecord.replace(streamRecord.getValue(), ((AssignerWithPeriodicWatermarks) this.userFunction).extractTimestamp(streamRecord.getValue(), streamRecord.hasTimestamp() ? streamRecord.getTimestamp() : Long.MIN_VALUE)));
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback
    public void onProcessingTime(long j) throws Exception {
        Watermark currentWatermark = ((AssignerWithPeriodicWatermarks) this.userFunction).getCurrentWatermark();
        if (currentWatermark != null && currentWatermark.getTimestamp() > this.currentWatermark) {
            this.currentWatermark = currentWatermark.getTimestamp();
            this.output.emitWatermark(currentWatermark);
        }
        getProcessingTimeService().registerTimer(getProcessingTimeService().getCurrentProcessingTime() + this.watermarkInterval, this);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
        if (watermark.getTimestamp() != Util.VLI_MAX || this.currentWatermark == Util.VLI_MAX) {
            return;
        }
        this.currentWatermark = Util.VLI_MAX;
        this.output.emitWatermark(watermark);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        Watermark currentWatermark = ((AssignerWithPeriodicWatermarks) this.userFunction).getCurrentWatermark();
        if (currentWatermark == null || currentWatermark.getTimestamp() <= this.currentWatermark) {
            return;
        }
        this.currentWatermark = currentWatermark.getTimestamp();
        this.output.emitWatermark(currentWatermark);
    }
}
