package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorFactory.class */
public final class CommitterOperatorFactory<CommT, GlobalCommT> extends AbstractStreamOperatorFactory<byte[]> implements OneInputStreamOperatorFactory<byte[], byte[]> {
    private final Sink<?, CommT, ?, GlobalCommT> sink;
    private final boolean batch;

    public CommitterOperatorFactory(Sink<?, CommT, ?, GlobalCommT> sink, boolean z) {
        this.sink = sink;
        this.batch = z;
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public <T extends StreamOperator<byte[]>> T createStreamOperator(StreamOperatorParameters<byte[]> streamOperatorParameters) {
        SimpleVersionedSerializer<CommT> orElseThrow = this.sink.getCommittableSerializer().orElseThrow(this::noSerializerFound);
        try {
            CommitterHandler<CommT, GlobalCommT> globalCommitterHandler = getGlobalCommitterHandler();
            if (this.batch) {
                Optional<Committer<CommT>> createCommitter = this.sink.createCommitter();
                if (createCommitter.isPresent()) {
                    globalCommitterHandler = new BatchCommitterHandler(createCommitter.get(), globalCommitterHandler);
                }
            }
            Preconditions.checkState(!(globalCommitterHandler instanceof NoopCommitterHandler), "committer operator without commmitter");
            CommitterOperator committerOperator = new CommitterOperator(this.processingTimeService, orElseThrow, globalCommitterHandler);
            committerOperator.setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            return committerOperator;
        } catch (Exception e) {
            throw new IllegalStateException("Cannot create commit operator of " + this.sink, e);
        }
    }

    private IllegalStateException noSerializerFound() {
        return new IllegalStateException(this.sink.getClass() + " does not implement getCommittableSerializer which is needed for any (global) committer.");
    }

    private CommitterHandler<CommT, GlobalCommT> getGlobalCommitterHandler() throws IOException {
        Optional<GlobalCommitter<CommT, GlobalCommT>> createGlobalCommitter = this.sink.createGlobalCommitter();
        if (!createGlobalCommitter.isPresent()) {
            return NoopCommitterHandler.getInstance();
        }
        if (this.batch) {
            return new GlobalBatchCommitterHandler(createGlobalCommitter.get());
        }
        return new GlobalStreamingCommitterHandler(createGlobalCommitter.get(), this.sink.getGlobalCommittableSerializer().orElseThrow(this::noGlobalSerializerFound));
    }

    private IllegalStateException noGlobalSerializerFound() {
        return new IllegalStateException(this.sink.getClass() + " does not implement getGlobalCommittableSerializer which is needed for streaming global committers.");
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperatorFactory
    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
        return CommitterOperator.class;
    }
}
