package org.apache.flink.runtime.jobmanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.flink.shaded.org.apache.curator.utils.ZKPaths;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.class */
public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperSubmittedJobGraphStore.class);
    private final CuratorFramework client;
    private final ZooKeeperStateHandleStore<SubmittedJobGraph> jobGraphsInZooKeeper;
    private final PathChildrenCache pathCache;
    private final String zooKeeperFullBasePath;
    private SubmittedJobGraphStore.SubmittedJobGraphListener jobGraphListener;
    private boolean isRunning;
    private final Object cacheLock = new Object();
    private final Set<JobID> addedJobGraphs = new HashSet();

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore$SubmittedJobGraphsPathCacheListener.class */
    private final class SubmittedJobGraphsPathCacheListener implements PathChildrenCacheListener {
        private SubmittedJobGraphsPathCacheListener() {
        }

        @Override // org.apache.flink.shaded.org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
            if (ZooKeeperSubmittedJobGraphStore.LOG.isDebugEnabled()) {
                if (pathChildrenCacheEvent.getData() != null) {
                    ZooKeeperSubmittedJobGraphStore.LOG.debug("Received {} event (path: {})", pathChildrenCacheEvent.getType(), pathChildrenCacheEvent.getData().getPath());
                } else {
                    ZooKeeperSubmittedJobGraphStore.LOG.debug("Received {} event", pathChildrenCacheEvent.getType());
                }
            }
            switch (pathChildrenCacheEvent.getType()) {
                case CHILD_ADDED:
                    JobID fromEvent = fromEvent(pathChildrenCacheEvent);
                    ZooKeeperSubmittedJobGraphStore.LOG.debug("Received CHILD_ADDED event notification for job {}", fromEvent);
                    synchronized (ZooKeeperSubmittedJobGraphStore.this.cacheLock) {
                        try {
                            if (ZooKeeperSubmittedJobGraphStore.this.jobGraphListener != null && !ZooKeeperSubmittedJobGraphStore.this.addedJobGraphs.contains(fromEvent)) {
                                try {
                                    ZooKeeperSubmittedJobGraphStore.this.jobGraphListener.onAddedJobGraph(fromEvent);
                                } catch (Throwable th) {
                                    ZooKeeperSubmittedJobGraphStore.LOG.error("Error in callback", th);
                                }
                            }
                        } catch (Exception e) {
                            ZooKeeperSubmittedJobGraphStore.LOG.error("Error in SubmittedJobGraphsPathCacheListener", e);
                        }
                    }
                    return;
                case CHILD_UPDATED:
                default:
                    return;
                case CHILD_REMOVED:
                    JobID fromEvent2 = fromEvent(pathChildrenCacheEvent);
                    ZooKeeperSubmittedJobGraphStore.LOG.debug("Received CHILD_REMOVED event notification for job {}", fromEvent2);
                    synchronized (ZooKeeperSubmittedJobGraphStore.this.cacheLock) {
                        try {
                            if (ZooKeeperSubmittedJobGraphStore.this.jobGraphListener != null && ZooKeeperSubmittedJobGraphStore.this.addedJobGraphs.contains(fromEvent2)) {
                                try {
                                    ZooKeeperSubmittedJobGraphStore.this.jobGraphListener.onRemovedJobGraph(fromEvent2);
                                } catch (Throwable th2) {
                                    ZooKeeperSubmittedJobGraphStore.LOG.error("Error in callback", th2);
                                }
                            }
                        } catch (Exception e2) {
                            ZooKeeperSubmittedJobGraphStore.LOG.error("Error in SubmittedJobGraphsPathCacheListener", e2);
                            return;
                        }
                    }
                    return;
                case CONNECTION_SUSPENDED:
                    ZooKeeperSubmittedJobGraphStore.LOG.warn("ZooKeeper connection SUSPENDED. Changes to the submitted job graphs are not monitored (temporarily).");
                    return;
                case CONNECTION_LOST:
                    ZooKeeperSubmittedJobGraphStore.LOG.warn("ZooKeeper connection LOST. Changes to the submitted job graphs are not monitored (permanently).");
                    return;
                case CONNECTION_RECONNECTED:
                    ZooKeeperSubmittedJobGraphStore.LOG.info("ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored again.");
                    return;
                case INITIALIZED:
                    ZooKeeperSubmittedJobGraphStore.LOG.info("SubmittedJobGraphsPathCacheListener initialized");
                    return;
            }
        }

        private JobID fromEvent(PathChildrenCacheEvent pathChildrenCacheEvent) {
            return JobID.fromHexString(ZKPaths.getNodeFromPath(pathChildrenCacheEvent.getData().getPath()));
        }
    }

    public ZooKeeperSubmittedJobGraphStore(CuratorFramework curatorFramework, String str, RetrievableStateStorageHelper<SubmittedJobGraph> retrievableStateStorageHelper, Executor executor) throws Exception {
        Preconditions.checkNotNull(str, "Current jobs path");
        Preconditions.checkNotNull(retrievableStateStorageHelper, "State storage");
        this.client = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "Curator client");
        curatorFramework.newNamespaceAwareEnsurePath(str).ensure(curatorFramework.getZookeeperClient());
        CuratorFramework usingNamespace = curatorFramework.usingNamespace(curatorFramework.getNamespace() + str);
        this.zooKeeperFullBasePath = curatorFramework.getNamespace() + str;
        this.jobGraphsInZooKeeper = new ZooKeeperStateHandleStore<>(usingNamespace, retrievableStateStorageHelper, executor);
        this.pathCache = new PathChildrenCache(usingNamespace, "/", false);
        this.pathCache.getListenable().addListener(new SubmittedJobGraphsPathCacheListener());
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public void start(SubmittedJobGraphStore.SubmittedJobGraphListener submittedJobGraphListener) throws Exception {
        synchronized (this.cacheLock) {
            if (!this.isRunning) {
                this.jobGraphListener = submittedJobGraphListener;
                this.pathCache.start();
                this.isRunning = true;
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public void stop() throws Exception {
        synchronized (this.cacheLock) {
            if (this.isRunning) {
                this.jobGraphListener = null;
                try {
                    try {
                        this.pathCache.close();
                        this.isRunning = false;
                    } catch (Throwable th) {
                        this.isRunning = false;
                        throw th;
                    }
                } catch (Exception e) {
                    throw new Exception("Could not properly stop the ZooKeeperSubmittedJobGraphStore.", e);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public SubmittedJobGraph recoverJobGraph(JobID jobID) throws Exception {
        SubmittedJobGraph retrieveState;
        Preconditions.checkNotNull(jobID, "Job ID");
        String pathForJob = getPathForJob(jobID);
        LOG.debug("Recovering job graph {} from {}{}.", new Object[]{jobID, this.zooKeeperFullBasePath, pathForJob});
        synchronized (this.cacheLock) {
            verifyIsRunning();
            try {
                try {
                    try {
                        retrieveState = this.jobGraphsInZooKeeper.getAndLock(pathForJob).retrieveState();
                        this.addedJobGraphs.add(retrieveState.getJobId());
                        LOG.info("Recovered {}.", retrieveState);
                        if (1 == 0) {
                            this.jobGraphsInZooKeeper.release(pathForJob);
                        }
                    } catch (IOException e) {
                        throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + pathForJob + ". This indicates that the retrieved state handle is broken. Try cleaning the state handle store.", e);
                    } catch (ClassNotFoundException e2) {
                        throw new FlinkException("Could not retrieve submitted JobGraph from state handle under " + pathForJob + ". This indicates that you are trying to recover from state written by an older Flink version which is not compatible. Try cleaning the state handle store.", e2);
                    }
                } catch (Throwable th) {
                    if (0 == 0) {
                        this.jobGraphsInZooKeeper.release(pathForJob);
                    }
                    throw th;
                }
            } catch (KeeperException.NoNodeException e3) {
                if (1 == 0) {
                    this.jobGraphsInZooKeeper.release(pathForJob);
                }
                return null;
            } catch (Exception e4) {
                throw new Exception("Could not retrieve the submitted job graph state handle for " + pathForJob + "from the submitted job graph store.", e4);
            }
        }
        return retrieveState;
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public void putJobGraph(SubmittedJobGraph submittedJobGraph) throws Exception {
        Preconditions.checkNotNull(submittedJobGraph, "Job graph");
        String pathForJob = getPathForJob(submittedJobGraph.getJobId());
        LOG.debug("Adding job graph {} to {}{}.", new Object[]{submittedJobGraph.getJobId(), this.zooKeeperFullBasePath, pathForJob});
        boolean z = false;
        while (!z) {
            synchronized (this.cacheLock) {
                verifyIsRunning();
                int exists = this.jobGraphsInZooKeeper.exists(pathForJob);
                if (exists == -1) {
                    try {
                        this.jobGraphsInZooKeeper.addAndLock(pathForJob, submittedJobGraph);
                        this.addedJobGraphs.add(submittedJobGraph.getJobId());
                        z = true;
                    } catch (KeeperException.NodeExistsException e) {
                    }
                } else {
                    if (!this.addedJobGraphs.contains(submittedJobGraph.getJobId())) {
                        throw new IllegalStateException("Oh, no. Trying to update a graph you didn't #getAllSubmittedJobGraphs() or #putJobGraph() yourself before.");
                    }
                    try {
                        this.jobGraphsInZooKeeper.replace(pathForJob, exists, submittedJobGraph);
                        LOG.info("Updated {} in ZooKeeper.", submittedJobGraph);
                        z = true;
                    } catch (KeeperException.NoNodeException e2) {
                    }
                }
            }
        }
        LOG.info("Added {} to ZooKeeper.", submittedJobGraph);
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public void removeJobGraph(JobID jobID) throws Exception {
        Preconditions.checkNotNull(jobID, "Job ID");
        String pathForJob = getPathForJob(jobID);
        LOG.debug("Removing job graph {} from {}{}.", new Object[]{jobID, this.zooKeeperFullBasePath, pathForJob});
        synchronized (this.cacheLock) {
            if (this.addedJobGraphs.contains(jobID)) {
                this.jobGraphsInZooKeeper.releaseAndTryRemove(pathForJob);
                this.addedJobGraphs.remove(jobID);
            }
        }
        LOG.info("Removed job graph {} from ZooKeeper.", jobID);
    }

    @Override // org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
    public Collection<JobID> getJobIds() throws Exception {
        LOG.debug("Retrieving all stored job ids from ZooKeeper under {}.", this.zooKeeperFullBasePath);
        try {
            Collection<String> allPaths = this.jobGraphsInZooKeeper.getAllPaths();
            ArrayList arrayList = new ArrayList(allPaths.size());
            for (String str : allPaths) {
                try {
                    arrayList.add(jobIdfromPath(str));
                } catch (Exception e) {
                    LOG.warn("Could not parse job id from {}. This indicates a malformed path.", str, e);
                }
            }
            return arrayList;
        } catch (Exception e2) {
            throw new Exception("Failed to retrieve entry paths from ZooKeeperStateHandleStore.", e2);
        }
    }

    private void verifyIsRunning() {
        Preconditions.checkState(this.isRunning, "Not running. Forgot to call start()?");
    }

    public static String getPathForJob(JobID jobID) {
        Preconditions.checkNotNull(jobID, "Job ID");
        return String.format("/%s", jobID);
    }

    public static JobID jobIdfromPath(String str) {
        return JobID.fromHexString(str);
    }
}
