package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.metrics.ProbeUnit;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.accumulator.LongLongAccumulator;
import com.hazelcast.jet.core.metrics.MetricNames;
import com.hazelcast.jet.core.metrics.MetricTags;
import com.hazelcast.jet.impl.util.AsyncSnapshotWriter;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.logging.ILogger;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/execution/StoreSnapshotTasklet.class */
public class StoreSnapshotTasklet implements Tasklet {
    long pendingSnapshotId;
    private final SnapshotContext snapshotContext;
    private final InboundEdgeStream inboundEdgeStream;
    private final ILogger logger;
    private final String vertexName;
    private final boolean isHigherPrioritySource;
    private final AsyncSnapshotWriter ssWriter;
    private boolean hasReachedBarrier;
    private Map.Entry<Data, Data> pendingEntry;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ProgressTracker progTracker = new ProgressTracker();
    private final AtomicReference<LongLongAccumulator> metrics = new AtomicReference<>(new LongLongAccumulator());
    private State state = State.DRAIN;
    private Consumer<Object> addToInboxFunction = this::addToInbox;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/StoreSnapshotTasklet$State.class */
    public enum State {
        DRAIN,
        FLUSH,
        REACHED_BARRIER,
        DONE
    }

    public StoreSnapshotTasklet(SnapshotContext snapshotContext, InboundEdgeStream inboundEdgeStream, AsyncSnapshotWriter asyncSnapshotWriter, ILogger iLogger, String str, boolean z) {
        this.snapshotContext = snapshotContext;
        this.inboundEdgeStream = inboundEdgeStream;
        this.logger = iLogger;
        this.vertexName = str;
        this.isHigherPrioritySource = z;
        this.ssWriter = asyncSnapshotWriter;
        this.pendingSnapshotId = snapshotContext.activeSnapshotIdPhase1() + 1;
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet
    @Nonnull
    public ProgressState call() {
        this.progTracker.reset();
        stateMachineStep();
        return this.progTracker.toProgressState();
    }

    private void stateMachineStep() {
        switch (this.state) {
            case DRAIN:
                this.progTracker.notDone();
                if (this.pendingEntry != null) {
                    if (!this.ssWriter.offer(this.pendingEntry)) {
                        return;
                    } else {
                        this.progTracker.madeProgress();
                    }
                }
                this.pendingEntry = null;
                ProgressState drainTo = this.inboundEdgeStream.drainTo(this.addToInboxFunction);
                if (drainTo.isDone()) {
                    if (!$assertionsDisabled && !this.ssWriter.isEmpty()) {
                        throw new AssertionError("input is done, but we had some entries and not the barrier");
                    }
                    this.snapshotContext.storeSnapshotTaskletDone(this.pendingSnapshotId - 1, this.isHigherPrioritySource);
                    this.state = State.DONE;
                    this.progTracker.reset();
                }
                this.progTracker.madeProgress(drainTo.isMadeProgress());
                if (this.hasReachedBarrier) {
                    this.state = State.FLUSH;
                    stateMachineStep();
                    return;
                }
                return;
            case FLUSH:
                this.progTracker.notDone();
                if (this.ssWriter.flushAndResetMap()) {
                    this.progTracker.madeProgress();
                    this.state = State.REACHED_BARRIER;
                    return;
                }
                return;
            case REACHED_BARRIER:
                if (this.ssWriter.hasPendingAsyncOps()) {
                    this.progTracker.notDone();
                    return;
                }
                Throwable error = this.ssWriter.getError();
                if (error != null) {
                    this.logger.severe("Error writing to snapshot map", error);
                    this.snapshotContext.reportError(error);
                }
                this.progTracker.madeProgress();
                long totalPayloadBytes = this.ssWriter.getTotalPayloadBytes();
                long totalKeys = this.ssWriter.getTotalKeys();
                this.snapshotContext.phase1DoneForTasklet(totalPayloadBytes, totalKeys, this.ssWriter.getTotalChunks());
                this.metrics.set(new LongLongAccumulator(totalPayloadBytes, totalKeys));
                this.ssWriter.resetStats();
                this.pendingSnapshotId++;
                this.hasReachedBarrier = false;
                this.state = State.DRAIN;
                this.progTracker.notDone();
                return;
            default:
                throw new JetException("Unexpected state: " + this.state);
        }
    }

    private boolean addToInbox(Object obj) {
        if (!(obj instanceof SnapshotBarrier)) {
            if (this.ssWriter.offer((Map.Entry) obj)) {
                return true;
            }
            this.pendingEntry = (Map.Entry) obj;
            return false;
        }
        SnapshotBarrier snapshotBarrier = (SnapshotBarrier) obj;
        if ($assertionsDisabled || this.pendingSnapshotId == snapshotBarrier.snapshotId()) {
            this.hasReachedBarrier = true;
            return true;
        }
        AssertionError assertionError = new AssertionError("Unexpected barrier, expected was " + this.pendingSnapshotId + ", but barrier was " + assertionError + ", this=" + snapshotBarrier.snapshotId());
        throw assertionError;
    }

    @Override // com.hazelcast.jet.impl.execution.Tasklet, com.hazelcast.internal.metrics.DynamicMetricsProvider
    public void provideDynamicMetrics(MetricDescriptor metricDescriptor, MetricsCollectionContext metricsCollectionContext) {
        MetricDescriptor withTag = metricDescriptor.withTag(MetricTags.VERTEX, this.vertexName);
        LongLongAccumulator longLongAccumulator = this.metrics.get();
        metricsCollectionContext.collect(withTag, MetricNames.SNAPSHOT_BYTES, ProbeLevel.INFO, ProbeUnit.COUNT, longLongAccumulator.get1());
        metricsCollectionContext.collect(withTag, MetricNames.SNAPSHOT_KEYS, ProbeLevel.INFO, ProbeUnit.COUNT, longLongAccumulator.get2());
    }

    public String toString() {
        return StoreSnapshotTasklet.class.getSimpleName() + "{" + this.vertexName + "}";
    }

    static {
        $assertionsDisabled = !StoreSnapshotTasklet.class.desiredAssertionStatus();
    }
}
