package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkOperatorFactory.class */
public final class SinkOperatorFactory<InputT, CommT, WriterStateT> extends AbstractStreamOperatorFactory<byte[]> implements OneInputStreamOperatorFactory<InputT, byte[]>, YieldingOperatorFactory<byte[]> {
    private final Sink<InputT, CommT, WriterStateT, ?> sink;
    private final boolean batch;
    private final boolean shouldEmit;

    public SinkOperatorFactory(Sink<InputT, CommT, WriterStateT, ?> sink, boolean z, boolean z2) {
        this.sink = sink;
        this.batch = z;
        this.shouldEmit = z2;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public <T extends StreamOperator<byte[]>> T createStreamOperator(StreamOperatorParameters<byte[]> streamOperatorParameters) {
        Optional<SimpleVersionedSerializer<WriterStateT>> writerStateSerializer = this.sink.getWriterStateSerializer();
        SinkWriterStateHandler statefulSinkWriterStateHandler = writerStateSerializer.isPresent() ? new StatefulSinkWriterStateHandler(writerStateSerializer.get(), this.sink.getCompatibleStateNames()) : StatelessSinkWriterStateHandler.getInstance();
        Optional<SimpleVersionedSerializer<CommT>> committableSerializer = this.sink.getCommittableSerializer();
        CommitterHandler forwardCommittingHandler = this.shouldEmit ? new ForwardCommittingHandler() : NoopCommitterHandler.getInstance();
        if (!this.batch) {
            try {
                Optional<Committer<CommT>> createCommitter = this.sink.createCommitter();
                if (createCommitter.isPresent()) {
                    forwardCommittingHandler = new StreamingCommitterHandler(createCommitter.get(), committableSerializer.orElseThrow(this::noSerializerFound));
                }
            } catch (Exception e) {
                throw new IllegalStateException("Cannot create committer of " + this.sink, e);
            }
        }
        ProcessingTimeService processingTimeService = this.processingTimeService;
        MailboxExecutor mailboxExecutor = getMailboxExecutor();
        Sink<InputT, CommT, WriterStateT, ?> sink = this.sink;
        sink.getClass();
        SinkOperator sinkOperator = new SinkOperator(processingTimeService, mailboxExecutor, sink::createWriter, statefulSinkWriterStateHandler, forwardCommittingHandler, this.shouldEmit ? committableSerializer.orElseThrow(this::noSerializerFound) : null);
        sinkOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
        return sinkOperator;
    }

    private IllegalStateException noSerializerFound() {
        return new IllegalStateException(this.sink.getClass() + " does not implement getCommittableSerializer which is needed for any (global) committer.");
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
        return SinkOperator.class;
    }

    @VisibleForTesting
    public Sink<InputT, CommT, WriterStateT, ?> getSink() {
        return this.sink;
    }
}
