package org.apache.hadoop.hbase.master.zksyncer;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.class */
public abstract class ClientZKSyncer extends ZKListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ClientZKSyncer.class);
    private final Server server;
    private final ZKWatcher clientZkWatcher;
    private final Map<String, BlockingQueue<byte[]>> queues;

    /* loaded from: input_file:org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer$ClientZkUpdater.class */
    class ClientZkUpdater extends Thread {
        final String znode;
        final BlockingQueue<byte[]> queue;

        public ClientZkUpdater(String str, BlockingQueue<byte[]> blockingQueue) {
            this.znode = str;
            this.queue = blockingQueue;
            setName("ClientZKUpdater-" + str);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!ClientZKSyncer.this.server.isStopped()) {
                try {
                    ClientZKSyncer.this.setDataForClientZkUntilSuccess(this.znode, this.queue.take());
                } catch (InterruptedException e) {
                    if (ClientZKSyncer.LOG.isDebugEnabled()) {
                        ClientZKSyncer.LOG.debug("Interrupted while checking whether need to update meta location to client zk");
                    }
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    public ClientZKSyncer(ZKWatcher zKWatcher, ZKWatcher zKWatcher2, Server server) {
        super(zKWatcher);
        this.server = server;
        this.clientZkWatcher = zKWatcher2;
        this.queues = new HashMap();
    }

    public void start() throws KeeperException {
        LOG.debug("Starting " + getClass().getSimpleName());
        this.watcher.registerListener(this);
        ZKUtil.createWithParents(this.clientZkWatcher, this.watcher.getZNodePaths().baseZNode);
        Collection<String> nodesToWatch = getNodesToWatch();
        LOG.debug("Znodes to watch: " + nodesToWatch);
        for (String str : nodesToWatch) {
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            this.queues.put(str, arrayBlockingQueue);
            ClientZkUpdater clientZkUpdater = new ClientZkUpdater(str, arrayBlockingQueue);
            clientZkUpdater.setDaemon(true);
            clientZkUpdater.start();
            watchAndCheckExists(str);
        }
    }

    private void watchAndCheckExists(String str) {
        try {
            if (ZKUtil.watchAndCheckExists(this.watcher, str)) {
                byte[] dataAndWatch = ZKUtil.getDataAndWatch(this.watcher, str);
                if (dataAndWatch != null) {
                    upsertQueue(str, dataAndWatch);
                } else {
                    LOG.debug("Found no data from " + str);
                    watchAndCheckExists(str);
                }
            } else {
                ZKUtil.deleteNodeFailSilent(this.clientZkWatcher, str);
            }
        } catch (KeeperException e) {
            this.server.abort("Unexpected exception during initialization, aborting", e);
        }
    }

    private void upsertQueue(String str, byte[] bArr) {
        BlockingQueue<byte[]> blockingQueue = this.queues.get(str);
        synchronized (blockingQueue) {
            blockingQueue.poll();
            blockingQueue.offer(bArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void setDataForClientZkUntilSuccess(String str, byte[] bArr) throws InterruptedException {
        while (!this.server.isStopped()) {
            try {
                LOG.debug("Set data for remote " + str + ", client zk wather: " + this.clientZkWatcher);
                ZKUtil.setData(this.clientZkWatcher, str, bArr);
                return;
            } catch (KeeperException e) {
                LOG.debug("Failed to set data to client ZK, will retry later", e);
                Threads.sleep(200L);
            } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e2) {
                reconnectAfterExpiration();
                Threads.sleep(200L);
            } catch (KeeperException.NoNodeException e3) {
                try {
                    ZKUtil.createNodeIfNotExistsNoWatch(this.clientZkWatcher, str, bArr, CreateMode.PERSISTENT);
                    return;
                } catch (KeeperException e4) {
                    LOG.warn("Failed to create znode " + str + " due to: " + e4.getMessage() + ", will retry later");
                    Threads.sleep(200L);
                } catch (KeeperException.ConnectionLossException | KeeperException.SessionExpiredException e5) {
                    reconnectAfterExpiration();
                    Threads.sleep(200L);
                }
            }
        }
    }

    private final void reconnectAfterExpiration() throws InterruptedException {
        LOG.warn("ZK session expired or lost. Retry a new connection...");
        try {
            this.clientZkWatcher.reconnectAfterExpiration();
        } catch (IOException | KeeperException e) {
            LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", (Throwable) e);
        }
    }

    @Override // org.apache.hadoop.hbase.zookeeper.ZKListener
    public void nodeCreated(String str) {
        if (validate(str)) {
            try {
                upsertQueue(str, ZKUtil.getDataAndWatch(this.watcher, str));
            } catch (KeeperException e) {
                LOG.warn("Unexpected exception handling nodeCreated event", e);
            }
        }
    }

    @Override // org.apache.hadoop.hbase.zookeeper.ZKListener
    public void nodeDataChanged(String str) {
        if (validate(str)) {
            nodeCreated(str);
        }
    }

    @Override // org.apache.hadoop.hbase.zookeeper.ZKListener
    public synchronized void nodeDeleted(String str) {
        if (validate(str)) {
            try {
                if (ZKUtil.watchAndCheckExists(this.watcher, str)) {
                    nodeCreated(str);
                }
            } catch (KeeperException e) {
                LOG.warn("Unexpected exception handling nodeDeleted event for path: " + str, e);
            }
        }
    }

    abstract boolean validate(String str);

    abstract Collection<String> getNodesToWatch();
}
