package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.curator5.com.google.common.collect.Sets;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.CuratorEvent;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.api.CuratorEventType;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.SharedCount;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.shared.VersionedValue;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.class */
public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCheckpointIDCounter.class);
    private final CuratorFramework client;
    private final SharedCount sharedCount;
    private final LastStateConnectionStateListener connectionStateListener;

    @GuardedBy("startStopLock")
    private boolean isStarted;
    private final Object startStopLock = new Object();
    private final String counterPath = ZooKeeperUtils.getCheckpointIdCounterPath();

    public ZooKeeperCheckpointIDCounter(CuratorFramework curatorFramework, LastStateConnectionStateListener lastStateConnectionStateListener) {
        this.client = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "Curator client");
        this.sharedCount = new SharedCount(curatorFramework, this.counterPath, 1);
        this.connectionStateListener = lastStateConnectionStateListener;
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public void start() throws Exception {
        synchronized (this.startStopLock) {
            if (!this.isStarted) {
                this.sharedCount.start();
                this.client.getConnectionStateListenable().addListener(this.connectionStateListener);
                this.isStarted = true;
            }
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
        synchronized (this.startStopLock) {
            if (this.isStarted) {
                LOG.info("Shutting down.");
                try {
                    this.sharedCount.close();
                    this.client.getConnectionStateListenable().removeListener(this.connectionStateListener);
                    if (jobStatus.isGloballyTerminalState()) {
                        LOG.info("Removing {} from ZooKeeper", this.counterPath);
                        try {
                            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                            this.client.delete().inBackground((curatorFramework, curatorEvent) -> {
                                handleDeletionOfCounterPath(curatorEvent, completableFuture);
                            }).forPath(this.counterPath);
                            return completableFuture;
                        } catch (Exception e) {
                            return FutureUtils.completedExceptionally(e);
                        }
                    }
                    this.isStarted = false;
                } catch (IOException e2) {
                    return FutureUtils.completedExceptionally(e2);
                }
            }
            return FutureUtils.completedVoidFuture();
        }
    }

    private void handleDeletionOfCounterPath(CuratorEvent curatorEvent, CompletableFuture<Void> completableFuture) {
        Preconditions.checkArgument(curatorEvent.getType() == CuratorEventType.DELETE, "An unexpected CuratorEvent was monitored: " + curatorEvent.getType());
        Preconditions.checkArgument(this.counterPath.endsWith(curatorEvent.getPath()), "An unexpected path was selected for deletion: " + curatorEvent.getPath());
        KeeperException.Code code = KeeperException.Code.get(curatorEvent.getResultCode());
        if (Sets.immutableEnumSet(KeeperException.Code.OK, KeeperException.Code.NONODE).contains(code)) {
            completableFuture.complete(null);
        } else {
            String generateZookeeperPath = ZooKeeperUtils.generateZookeeperPath(this.client.getNamespace(), this.counterPath);
            completableFuture.completeExceptionally(new FlinkException(String.format("An error occurred while shutting down the CheckpointIDCounter in path '%s'.", generateZookeeperPath), KeeperException.create(code, generateZookeeperPath)));
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public long getAndIncrement() throws Exception {
        VersionedValue<Integer> versionedValue;
        int intValue;
        do {
            checkConnectionState();
            versionedValue = this.sharedCount.getVersionedValue();
            intValue = versionedValue.getValue().intValue() + 1;
            if (intValue < 0) {
                throw new Exception("Checkpoint counter overflow. ZooKeeper checkpoint counter only supports checkpoints Ids up to 2147483647");
            }
        } while (!this.sharedCount.trySetCount(versionedValue, intValue));
        return versionedValue.getValue().intValue();
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public long get() {
        checkConnectionState();
        return this.sharedCount.getVersionedValue().getValue().intValue();
    }

    @Override // org.apache.flink.runtime.checkpoint.CheckpointIDCounter
    public void setCount(long j) throws Exception {
        checkConnectionState();
        if (j > 2147483647L) {
            throw new IllegalArgumentException("ZooKeeper checkpoint counter only supports checkpoints Ids up to 2147483647, but given value is" + j);
        }
        this.sharedCount.setCount((int) j);
    }

    private void checkConnectionState() {
        this.connectionStateListener.getLastState().ifPresent(connectionState -> {
            if (connectionState != ConnectionState.CONNECTED && connectionState != ConnectionState.RECONNECTED) {
                throw new IllegalStateException("Connection state: " + connectionState);
            }
        });
    }

    @VisibleForTesting
    String getPath() {
        return this.counterPath;
    }
}
