package org.apache.flink.streaming.api.operators.source;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.class */
public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
    private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
    private final TimestampAssigner<T> timestampAssigner;
    private final WatermarkGenerator<T> watermarkGenerator;
    private final WatermarkOutput onEventWatermarkOutput;
    private final WatermarkOutput periodicWatermarkOutput;
    private final StreamRecord<T> reusingRecord = new StreamRecord<>(null);

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceOutputWithWatermarks(PushingAsyncDataInput.DataOutput<T> dataOutput, WatermarkOutput watermarkOutput, WatermarkOutput watermarkOutput2, TimestampAssigner<T> timestampAssigner, WatermarkGenerator<T> watermarkGenerator) {
        this.recordsOutput = (PushingAsyncDataInput.DataOutput) Preconditions.checkNotNull(dataOutput);
        this.onEventWatermarkOutput = (WatermarkOutput) Preconditions.checkNotNull(watermarkOutput);
        this.periodicWatermarkOutput = (WatermarkOutput) Preconditions.checkNotNull(watermarkOutput2);
        this.timestampAssigner = (TimestampAssigner) Preconditions.checkNotNull(timestampAssigner);
        this.watermarkGenerator = (WatermarkGenerator) Preconditions.checkNotNull(watermarkGenerator);
    }

    @Override // org.apache.flink.api.connector.source.SourceOutput
    public final void collect(T t) {
        collect(t, Long.MIN_VALUE);
    }

    @Override // org.apache.flink.api.connector.source.SourceOutput
    public final void collect(T t, long j) {
        try {
            long extractTimestamp = this.timestampAssigner.extractTimestamp(t, j);
            this.recordsOutput.emitRecord(this.reusingRecord.replace(t, extractTimestamp));
            this.watermarkGenerator.onEvent(t, extractTimestamp, this.onEventWatermarkOutput);
        } catch (ExceptionInChainedOperatorException e) {
            throw e;
        } catch (Exception e2) {
            throw new ExceptionInChainedOperatorException(e2);
        }
    }

    @Override // org.apache.flink.api.common.eventtime.WatermarkOutput
    public final void emitWatermark(Watermark watermark) {
        this.onEventWatermarkOutput.emitWatermark(watermark);
    }

    @Override // org.apache.flink.api.common.eventtime.WatermarkOutput
    public final void markIdle() {
        this.onEventWatermarkOutput.markIdle();
    }

    @Override // org.apache.flink.api.common.eventtime.WatermarkOutput
    public void markActive() {
        this.onEventWatermarkOutput.markActive();
    }

    public final void emitPeriodicWatermark() {
        this.watermarkGenerator.onPeriodicEmit(this.periodicWatermarkOutput);
    }

    public static <E> SourceOutputWithWatermarks<E> createWithSeparateOutputs(PushingAsyncDataInput.DataOutput<E> dataOutput, WatermarkOutput watermarkOutput, WatermarkOutput watermarkOutput2, TimestampAssigner<E> timestampAssigner, WatermarkGenerator<E> watermarkGenerator) {
        return new SourceOutputWithWatermarks<>(dataOutput, watermarkOutput, watermarkOutput2, timestampAssigner, watermarkGenerator);
    }
}
