/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.source.coordinator;

import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
implements OperatorCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinator.class);
    private final WatermarkAggregator<Integer> combinedWatermark = new WatermarkAggregator();
    private final WatermarkAlignmentParams watermarkAlignmentParams;
    private final String operatorName;
    private final Source<?, SplitT, EnumChkT> source;
    private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
    private final SourceCoordinatorContext<SplitT> context;
    private final CoordinatorStore coordinatorStore;
    private SplitEnumerator<SplitT, EnumChkT> enumerator;
    private boolean started;
    @Nullable
    private final String coordinatorListeningID;

    public SourceCoordinator(String operatorName, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore) {
        this(operatorName, source, context, coordinatorStore, WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, null);
    }

    public SourceCoordinator(String operatorName, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> context, CoordinatorStore coordinatorStore, WatermarkAlignmentParams watermarkAlignmentParams, @Nullable String coordinatorListeningID) {
        this.operatorName = operatorName;
        this.source = source;
        this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
        this.context = context;
        this.coordinatorStore = coordinatorStore;
        this.watermarkAlignmentParams = watermarkAlignmentParams;
        this.coordinatorListeningID = coordinatorListeningID;
        if (watermarkAlignmentParams.isEnabled()) {
            if (context.isConcurrentExecutionAttemptsSupported()) {
                throw new IllegalArgumentException("Watermark alignment is not supported in concurrent execution attempts scenario (e.g. if speculative execution is enabled)");
            }
            coordinatorStore.putIfAbsent(watermarkAlignmentParams.getWatermarkGroup(), new WatermarkAggregator());
            context.getCoordinatorExecutor().scheduleAtFixedRate(this::announceCombinedWatermark, watermarkAlignmentParams.getUpdateInterval(), watermarkAlignmentParams.getUpdateInterval(), TimeUnit.MILLISECONDS);
        }
    }

    @VisibleForTesting
    void announceCombinedWatermark() {
        long maxAllowedWatermark;
        Preconditions.checkState((this.watermarkAlignmentParams != WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED ? 1 : 0) != 0);
        Watermark globalCombinedWatermark = this.coordinatorStore.apply(this.watermarkAlignmentParams.getWatermarkGroup(), value -> {
            WatermarkAggregator aggregator = (WatermarkAggregator)value;
            return new Watermark(aggregator.getAggregatedWatermark().getTimestamp());
        });
        try {
            maxAllowedWatermark = Math.addExact(globalCombinedWatermark.getTimestamp(), this.watermarkAlignmentParams.getMaxAllowedWatermarkDrift());
        }
        catch (ArithmeticException e) {
            maxAllowedWatermark = Watermark.MAX_WATERMARK.getTimestamp();
        }
        Set<Integer> subTaskIds = this.combinedWatermark.keySet();
        LOG.info("Distributing maxAllowedWatermark={} to subTaskIds={}", (Object)maxAllowedWatermark, subTaskIds);
        for (Integer subtaskId : subTaskIds) {
            this.context.sendEventToSourceOperator(subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
        }
    }

    @Override
    public void start() throws Exception {
        LOG.info("Starting split enumerator for source {}.", (Object)this.operatorName);
        this.started = true;
        if (this.enumerator == null) {
            ClassLoader userCodeClassLoader = this.context.getCoordinatorContext().getUserCodeClassloader();
            try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)userCodeClassLoader);){
                this.enumerator = this.source.createEnumerator(this.context);
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                LOG.error("Failed to create Source Enumerator for source {}", (Object)this.operatorName, (Object)t);
                this.context.failJob(t);
                return;
            }
        }
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.enumerator.start()), "starting the SplitEnumerator.", new Object[0]);
        if (this.coordinatorListeningID != null) {
            this.coordinatorStore.compute(this.coordinatorListeningID, (key, oldValue) -> {
                if (oldValue == null || oldValue instanceof OperatorCoordinator) {
                    return this;
                }
                Preconditions.checkState((boolean)(oldValue instanceof OperatorEvent), (Object)("The existing value for " + this.coordinatorStore + "is expected to be an operator event, but it is in fact " + oldValue));
                LOG.info("Handling event {} received before the source coordinator with ID {} is registered", oldValue, (Object)this.coordinatorListeningID);
                this.handleEventFromOperator(0, 0, (OperatorEvent)oldValue);
                return null;
            });
        }
    }

    @Override
    public void close() throws Exception {
        LOG.info("Closing SourceCoordinator for source {}.", (Object)this.operatorName);
        if (this.started) {
            IOUtils.closeAll(Arrays.asList(this.context, this.enumerator), Throwable.class);
        }
        LOG.info("Source coordinator for source {} closed.", (Object)this.operatorName);
    }

    @Override
    public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            if (event instanceof RequestSplitEvent) {
                this.handleRequestSplitEvent(subtask, attemptNumber, (RequestSplitEvent)event);
            } else if (event instanceof SourceEventWrapper) {
                this.handleSourceEvent(subtask, attemptNumber, ((SourceEventWrapper)event).getSourceEvent());
            } else if (event instanceof ReaderRegistrationEvent) {
                this.handleReaderRegistrationEvent(subtask, attemptNumber, (ReaderRegistrationEvent)event);
            } else if (event instanceof ReportedWatermarkEvent) {
                this.handleReportedWatermark(subtask, new Watermark(((ReportedWatermarkEvent)event).getWatermark()));
            } else {
                throw new FlinkException("Unrecognized Operator Event: " + event);
            }
        }), "handling operator event %s from subtask %d (#%d)", event, subtask, attemptNumber);
    }

    @Override
    public void executionAttemptFailed(int subtaskId, int attemptNumber, @Nullable Throwable reason) {
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            LOG.info("Removing registered reader after failure for subtask {} (#{}) of source {}.", new Object[]{subtaskId, attemptNumber, this.operatorName});
            this.context.unregisterSourceReader(subtaskId, attemptNumber);
            this.context.attemptFailed(subtaskId, attemptNumber);
        }), "handling subtask %d (#%d) failure", subtaskId, attemptNumber);
    }

    @Override
    public void subtaskReset(int subtaskId, long checkpointId) {
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            LOG.info("Recovering subtask {} to checkpoint {} for source {} to checkpoint.", new Object[]{subtaskId, checkpointId, this.operatorName});
            this.context.subtaskReset(subtaskId);
            List<SplitT> splitsToAddBack = this.context.getAndRemoveUncheckpointedAssignment(subtaskId, checkpointId);
            LOG.debug("Adding splits back to the split enumerator of source {}: {}", (Object)this.operatorName, splitsToAddBack);
            this.enumerator.addSplitsBack(splitsToAddBack, subtaskId);
        }), "handling subtask %d recovery to checkpoint %d", subtaskId, checkpointId);
    }

    @Override
    public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) {
        Preconditions.checkArgument((subtask == gateway.getSubtask() ? 1 : 0) != 0);
        Preconditions.checkArgument((attemptNumber == gateway.getExecution().getAttemptNumber() ? 1 : 0) != 0);
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.context.attemptReady(gateway)), "making event gateway to subtask %d (#%d) available", subtask, attemptNumber);
    }

    @Override
    public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) {
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", (Object)this.operatorName, (Object)checkpointId);
            try {
                this.context.onCheckpoint(checkpointId);
                result.complete(this.toBytes(checkpointId));
            }
            catch (Throwable e) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)e);
                result.completeExceptionally(new CompletionException(String.format("Failed to checkpoint SplitEnumerator for source %s", this.operatorName), e));
            }
        }), "taking checkpoint %d", checkpointId);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            LOG.info("Marking checkpoint {} as completed for source {}.", (Object)checkpointId, (Object)this.operatorName);
            this.context.onCheckpointComplete(checkpointId);
            this.enumerator.notifyCheckpointComplete(checkpointId);
        }), "notifying the enumerator of completion of checkpoint %d", checkpointId);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) {
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            LOG.info("Marking checkpoint {} as aborted for source {}.", (Object)checkpointId, (Object)this.operatorName);
            this.enumerator.notifyCheckpointAborted(checkpointId);
        }), "calling notifyCheckpointAborted()", new Object[0]);
    }

    @Override
    public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        Preconditions.checkState((!this.started ? 1 : 0) != 0, (Object)"The coordinator can only be reset if it was not yet started");
        assert (this.enumerator == null);
        if (checkpointData == null) {
            return;
        }
        LOG.info("Restoring SplitEnumerator of source {} from checkpoint.", (Object)this.operatorName);
        ClassLoader userCodeClassLoader = this.context.getCoordinatorContext().getUserCodeClassloader();
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)userCodeClassLoader);){
            EnumChkT enumeratorCheckpoint = this.deserializeCheckpoint(checkpointData);
            this.enumerator = this.source.restoreEnumerator(this.context, enumeratorCheckpoint);
        }
    }

    private void runInEventLoop(ThrowingRunnable<Throwable> action, String actionName, Object ... actionNameFormatParameters) {
        this.ensureStarted();
        if (this.enumerator == null) {
            return;
        }
        this.context.runInCoordinatorThread(() -> {
            try {
                action.run();
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                String actionString = String.format(actionName, actionNameFormatParameters);
                LOG.error("Uncaught exception in the SplitEnumerator for Source {} while {}. Triggering job failover.", new Object[]{this.operatorName, actionString, t});
                this.context.failJob(t);
            }
        });
    }

    @VisibleForTesting
    SplitEnumerator<SplitT, EnumChkT> getEnumerator() {
        return this.enumerator;
    }

    @VisibleForTesting
    SourceCoordinatorContext<SplitT> getContext() {
        return this.context;
    }

    private byte[] toBytes(long checkpointId) throws Exception {
        return SourceCoordinator.writeCheckpointBytes(this.enumerator.snapshotState(checkpointId), this.enumCheckpointSerializer);
    }

    /*
     * Exception decompiling
     */
    static <EnumChkT> byte[] writeCheckpointBytes(EnumChkT enumeratorCheckpoint, SimpleVersionedSerializer<EnumChkT> enumeratorCheckpointSerializer) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    /*
     * Exception decompiling
     */
    private EnumChkT deserializeCheckpoint(byte[] bytes) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void handleRequestSplitEvent(int subtask, int attemptNumber, RequestSplitEvent event) {
        LOG.info("Source {} received split request from parallel task {} (#{})", new Object[]{this.operatorName, subtask, attemptNumber});
        if (!this.context.hasNoMoreSplits(subtask)) {
            this.enumerator.handleSplitRequest(subtask, event.hostName());
        }
    }

    private void handleSourceEvent(int subtask, int attemptNumber, SourceEvent event) {
        LOG.debug("Source {} received custom event from parallel task {} (#{}): {}", new Object[]{this.operatorName, subtask, attemptNumber, event});
        if (this.context.isConcurrentExecutionAttemptsSupported()) {
            Preconditions.checkState((boolean)(this.enumerator instanceof SupportsHandleExecutionAttemptSourceEvent), (String)"The split enumerator %s must implement SupportsHandleExecutionAttemptSourceEvent to be used in concurrent execution attempts scenario (e.g. if speculative execution is enabled).", (Object[])new Object[]{this.enumerator.getClass().getCanonicalName()});
            ((SupportsHandleExecutionAttemptSourceEvent)this.enumerator).handleSourceEvent(subtask, attemptNumber, event);
        } else {
            this.enumerator.handleSourceEvent(subtask, event);
        }
    }

    private void handleReaderRegistrationEvent(int subtask, int attemptNumber, ReaderRegistrationEvent event) {
        Preconditions.checkArgument((subtask == event.subtaskId() ? 1 : 0) != 0);
        LOG.info("Source {} registering reader for parallel task {} (#{}) @ {}", new Object[]{this.operatorName, subtask, attemptNumber, event.location()});
        boolean subtaskReaderExisted = this.context.registeredReadersOfAttempts().containsKey(subtask);
        this.context.registerSourceReader(subtask, attemptNumber, event.location());
        if (!subtaskReaderExisted) {
            this.enumerator.addReader(event.subtaskId());
        }
    }

    private void handleReportedWatermark(int subtask, Watermark watermark) throws FlinkException {
        if (this.context.isConcurrentExecutionAttemptsSupported()) {
            throw new FlinkException("ReportedWatermarkEvent is not supported in concurrent execution attempts scenario (e.g. if speculative execution is enabled)");
        }
        LOG.debug("New reported watermark={} from subTaskId={}", (Object)watermark, (Object)subtask);
        Preconditions.checkState((boolean)this.watermarkAlignmentParams.isEnabled());
        this.combinedWatermark.aggregate(subtask, watermark).ifPresent(newCombinedWatermark -> this.coordinatorStore.computeIfPresent(this.watermarkAlignmentParams.getWatermarkGroup(), (key, oldValue) -> {
            WatermarkAggregator watermarkAggregator = (WatermarkAggregator)oldValue;
            watermarkAggregator.aggregate(this.operatorName, (Watermark)newCombinedWatermark);
            return watermarkAggregator;
        }));
    }

    private void ensureStarted() {
        if (!this.started) {
            throw new IllegalStateException("The coordinator has not started yet.");
        }
    }

    private static class WatermarkAggregator<T> {
        private final Map<T, Watermark> watermarks = new HashMap<T, Watermark>();
        private Watermark aggregatedWatermark = new Watermark(Long.MIN_VALUE);

        private WatermarkAggregator() {
        }

        public Optional<Watermark> aggregate(T key, Watermark watermark) {
            this.watermarks.put(key, watermark);
            Watermark newMinimum = this.watermarks.values().stream().min(Comparator.comparingLong(Watermark::getTimestamp)).orElseThrow(IllegalStateException::new);
            if (newMinimum.equals((Object)this.aggregatedWatermark)) {
                return Optional.empty();
            }
            this.aggregatedWatermark = newMinimum;
            return Optional.of(this.aggregatedWatermark);
        }

        public Set<T> keySet() {
            return this.watermarks.keySet();
        }

        public Watermark getAggregatedWatermark() {
            return this.aggregatedWatermark;
        }
    }
}

