package org.apache.flink.runtime.leaderelection;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.UUID;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionService.class */
public class ZooKeeperLeaderElectionService implements LeaderElectionService, LeaderLatchListener, NodeCacheListener {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionService.class);
    private final CuratorFramework client;
    private final LeaderLatch leaderLatch;
    private final NodeCache cache;
    private final String leaderPath;
    private final Object lock = new Object();
    private final ConnectionStateListener listener = new ConnectionStateListener() { // from class: org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService.1
        @Override // org.apache.flink.shaded.org.apache.curator.framework.state.ConnectionStateListener
        public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
            ZooKeeperLeaderElectionService.this.handleStateChange(connectionState);
        }
    };
    private UUID issuedLeaderSessionID = null;
    private volatile UUID confirmedLeaderSessionID = null;
    private volatile LeaderContender leaderContender = null;
    private volatile boolean running = false;

    public ZooKeeperLeaderElectionService(CuratorFramework curatorFramework, String str, String str2) {
        this.client = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "CuratorFramework client");
        this.leaderPath = (String) Preconditions.checkNotNull(str2, "leaderPath");
        this.leaderLatch = new LeaderLatch(curatorFramework, str);
        this.cache = new NodeCache(curatorFramework, str2);
    }

    public UUID getLeaderSessionID() {
        return this.confirmedLeaderSessionID;
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionService
    public void start(LeaderContender leaderContender) throws Exception {
        Preconditions.checkNotNull(leaderContender, "Contender must not be null.");
        Preconditions.checkState(this.leaderContender == null, "Contender was already set.");
        LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
        synchronized (this.lock) {
            this.leaderContender = leaderContender;
            this.leaderLatch.addListener(this);
            this.leaderLatch.start();
            this.cache.getListenable().addListener(this);
            this.cache.start();
            this.client.getConnectionStateListenable().addListener(this.listener);
            this.running = true;
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionService
    public void stop() throws Exception {
        synchronized (this.lock) {
            if (this.running) {
                this.running = false;
                this.confirmedLeaderSessionID = null;
                this.issuedLeaderSessionID = null;
                LOG.info("Stopping ZooKeeperLeaderElectionService {}.", this);
                this.client.getConnectionStateListenable().removeListener(this.listener);
                Exception exc = null;
                try {
                    this.cache.close();
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e, null);
                }
                try {
                    this.leaderLatch.close();
                } catch (Exception e2) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                }
                if (exc != null) {
                    throw new Exception("Could not properly stop the ZooKeeperLeaderElectionService.", exc);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionService
    public void confirmLeaderSessionID(UUID uuid) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Confirm leader session ID {} for leader {}.", uuid, this.leaderContender.getAddress());
        }
        Preconditions.checkNotNull(uuid);
        if (!this.leaderLatch.hasLeadership()) {
            LOG.warn("The leader session ID {} was confirmed even though thecorresponding JobManager was not elected as the leader.", uuid);
            return;
        }
        synchronized (this.lock) {
            if (!this.running) {
                LOG.debug("Ignoring the leader session Id {} confirmation, since the ZooKeeperLeaderElectionService has already been stopped.", uuid);
            } else if (uuid.equals(this.issuedLeaderSessionID)) {
                this.confirmedLeaderSessionID = uuid;
                writeLeaderInformation(this.confirmedLeaderSessionID);
            }
        }
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderElectionService
    public boolean hasLeadership() {
        return this.leaderLatch.hasLeadership();
    }

    @Override // org.apache.flink.shaded.org.apache.curator.framework.recipes.leader.LeaderLatchListener
    public void isLeader() {
        synchronized (this.lock) {
            if (this.running) {
                this.issuedLeaderSessionID = UUID.randomUUID();
                this.confirmedLeaderSessionID = null;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Grant leadership to contender {} with session ID {}.", this.leaderContender.getAddress(), this.issuedLeaderSessionID);
                }
                this.leaderContender.grantLeadership(this.issuedLeaderSessionID);
            } else {
                LOG.debug("Ignoring the grant leadership notification since the service has already been stopped.");
            }
        }
    }

    @Override // org.apache.flink.shaded.org.apache.curator.framework.recipes.leader.LeaderLatchListener
    public void notLeader() {
        synchronized (this.lock) {
            if (this.running) {
                this.issuedLeaderSessionID = null;
                this.confirmedLeaderSessionID = null;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Revoke leadership of {}.", this.leaderContender.getAddress());
                }
                this.leaderContender.revokeLeadership();
            } else {
                LOG.debug("Ignoring the revoke leadership notification since the service has already been stopped.");
            }
        }
    }

    @Override // org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener
    public void nodeChanged() throws Exception {
        try {
            if (this.leaderLatch.hasLeadership()) {
                synchronized (this.lock) {
                    if (this.running) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Leader node changed while {} is the leader with session ID {}.", this.leaderContender.getAddress(), this.confirmedLeaderSessionID);
                        }
                        if (this.confirmedLeaderSessionID != null) {
                            ChildData currentData = this.cache.getCurrentData();
                            if (currentData == null) {
                                if (LOG.isDebugEnabled()) {
                                    LOG.debug("Writing leader information into empty node by {}.", this.leaderContender.getAddress());
                                }
                                writeLeaderInformation(this.confirmedLeaderSessionID);
                            } else {
                                byte[] data = currentData.getData();
                                if (data == null || data.length == 0) {
                                    if (LOG.isDebugEnabled()) {
                                        LOG.debug("Writing leader information into node with empty data field by {}.", this.leaderContender.getAddress());
                                    }
                                    writeLeaderInformation(this.confirmedLeaderSessionID);
                                } else {
                                    ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(data));
                                    String readUTF = objectInputStream.readUTF();
                                    UUID uuid = (UUID) objectInputStream.readObject();
                                    if (!readUTF.equals(this.leaderContender.getAddress()) || uuid == null || !uuid.equals(this.confirmedLeaderSessionID)) {
                                        if (LOG.isDebugEnabled()) {
                                            LOG.debug("Correcting leader information by {}.", this.leaderContender.getAddress());
                                        }
                                        writeLeaderInformation(this.confirmedLeaderSessionID);
                                    }
                                }
                            }
                        }
                    } else {
                        LOG.debug("Ignoring node change notification since the service has already been stopped.");
                    }
                }
            }
        } catch (Exception e) {
            this.leaderContender.handleError(new Exception("Could not handle node changed event.", e));
            throw e;
        }
    }

    protected void writeLeaderInformation(UUID uuid) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Write leader information: Leader={}, session ID={}.", this.leaderContender.getAddress(), uuid);
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeUTF(this.leaderContender.getAddress());
            objectOutputStream.writeObject(uuid);
            objectOutputStream.close();
            boolean z = false;
            while (!z && this.leaderLatch.hasLeadership()) {
                Stat forPath = this.client.checkExists().forPath(this.leaderPath);
                if (forPath == null) {
                    try {
                        ((ACLBackgroundPathAndBytesable) this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)).forPath(this.leaderPath, byteArrayOutputStream.toByteArray());
                        z = true;
                    } catch (KeeperException.NodeExistsException e) {
                    }
                } else if (forPath.getEphemeralOwner() == this.client.getZookeeperClient().getZooKeeper().getSessionId()) {
                    try {
                        this.client.setData().forPath(this.leaderPath, byteArrayOutputStream.toByteArray());
                        z = true;
                    } catch (KeeperException.NoNodeException e2) {
                    }
                } else {
                    try {
                        this.client.delete().forPath(this.leaderPath);
                    } catch (KeeperException.NoNodeException e3) {
                    }
                }
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Successfully wrote leader information: Leader={}, session ID={}.", this.leaderContender.getAddress(), uuid);
            }
        } catch (Exception e4) {
            this.leaderContender.handleError(new Exception("Could not write leader address and leader session ID to ZooKeeper.", e4));
        }
    }

    protected void handleStateChange(ConnectionState connectionState) {
        switch (connectionState) {
            case CONNECTED:
                LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");
                return;
            case SUSPENDED:
                LOG.warn("Connection to ZooKeeper suspended. The contender " + this.leaderContender.getAddress() + " no longer participates in the leader election.");
                return;
            case RECONNECTED:
                LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");
                return;
            case LOST:
                LOG.warn("Connection to ZooKeeper lost. The contender " + this.leaderContender.getAddress() + " no longer participates in the leader election.");
                return;
            default:
                return;
        }
    }
}
