/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.zookeeper.KeeperException;

@InterfaceAudience.Private
public class ReplicationSourceManager
implements ReplicationListener {
    private static final Log LOG = LogFactory.getLog(ReplicationSourceManager.class);
    private final List<ReplicationSourceInterface> sources = new ArrayList<ReplicationSourceInterface>();
    private final List<ReplicationSourceInterface> oldsources;
    private final ReplicationQueues replicationQueues;
    private final ReplicationTracker replicationTracker;
    private final ReplicationPeers replicationPeers;
    private final UUID clusterId;
    private final Stoppable stopper;
    private final Map<String, SortedSet<String>> hlogsById;
    private final Configuration conf;
    private final FileSystem fs;
    private Path latestPath;
    private final Path logDir;
    private final Path oldLogDir;
    private final long sleepBeforeFailover;
    private final ThreadPoolExecutor executor;
    private final Random rand;

    public ReplicationSourceManager(ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf, Stoppable stopper, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId) {
        this.replicationQueues = replicationQueues;
        this.replicationPeers = replicationPeers;
        this.replicationTracker = replicationTracker;
        this.stopper = stopper;
        this.hlogsById = new HashMap<String, SortedSet<String>>();
        this.oldsources = new ArrayList<ReplicationSourceInterface>();
        this.conf = conf;
        this.fs = fs;
        this.logDir = logDir;
        this.oldLogDir = oldLogDir;
        this.sleepBeforeFailover = conf.getLong("replication.sleep.before.failover", 2000L);
        this.clusterId = clusterId;
        this.replicationTracker.registerListener((ReplicationListener)this);
        this.replicationPeers.getAllPeerIds();
        int nbWorkers = conf.getInt("replication.executor.workers", 1);
        this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
        tfb.setNameFormat("ReplicationExecutor-%d");
        this.executor.setThreadFactory(tfb.build());
        this.rand = new Random();
    }

    public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered, boolean holdLogInZK) {
        String fileName = log.getName();
        this.replicationQueues.setLogPosition(id, fileName, position);
        if (holdLogInZK) {
            return;
        }
        this.cleanOldLogs(fileName, id, queueRecovered);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cleanOldLogs(String key, String id, boolean queueRecovered) {
        Map<String, SortedSet<String>> map = this.hlogsById;
        synchronized (map) {
            SortedSet<String> hlogs = this.hlogsById.get(id);
            if (queueRecovered || hlogs.first().equals(key)) {
                return;
            }
            SortedSet<String> hlogSet = hlogs.headSet(key);
            for (String hlog : hlogSet) {
                this.replicationQueues.removeLog(id, hlog);
            }
            hlogSet.clear();
        }
    }

    public void init() throws IOException {
        for (String id : this.replicationPeers.getConnectedPeers()) {
            this.addSource(id);
        }
        List currentReplicators = this.replicationQueues.getListOfReplicators();
        if (currentReplicators == null || currentReplicators.size() == 0) {
            return;
        }
        List otherRegionServers = this.replicationTracker.getListOfRegionServers();
        LOG.info((Object)("Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers));
        for (String rs : currentReplicators) {
            if (otherRegionServers.contains(rs)) continue;
            this.transferQueues(rs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReplicationSourceInterface addSource(String id) throws IOException {
        ReplicationSourceInterface src = this.getReplicationSource(this.conf, this.fs, this, this.replicationQueues, this.replicationPeers, this.stopper, id, this.clusterId);
        Map<String, SortedSet<String>> map = this.hlogsById;
        synchronized (map) {
            this.sources.add(src);
            this.hlogsById.put(id, new TreeSet());
            if (this.latestPath != null) {
                String name = this.latestPath.getName();
                this.hlogsById.get(id).add(name);
                try {
                    this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
                }
                catch (KeeperException ke) {
                    String message = "Cannot add log to zk for replication when creating a new source";
                    this.stopper.stop(message);
                    throw new IOException(message, ke);
                }
                src.enqueueLog(this.latestPath);
            }
        }
        src.startup();
        return src;
    }

    public void deleteSource(String peerId, boolean closeConnection) {
        this.replicationQueues.removeQueue(peerId);
        if (closeConnection) {
            this.replicationPeers.disconnectFromPeer(peerId);
        }
    }

    public void join() {
        this.executor.shutdown();
        if (this.sources.size() == 0) {
            this.replicationQueues.removeAllQueues();
        }
        for (ReplicationSourceInterface source : this.sources) {
            source.terminate("Region server is closing");
        }
    }

    protected Map<String, SortedSet<String>> getHLogs() {
        return Collections.unmodifiableMap(this.hlogsById);
    }

    public List<ReplicationSourceInterface> getSources() {
        return this.sources;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void preLogRoll(Path newLog) throws IOException {
        Map<String, SortedSet<String>> map = this.hlogsById;
        synchronized (map) {
            String name = newLog.getName();
            for (ReplicationSourceInterface replicationSourceInterface : this.sources) {
                try {
                    this.replicationQueues.addLog(replicationSourceInterface.getPeerClusterZnode(), name);
                }
                catch (KeeperException ke) {
                    throw new IOException("Cannot add log to zk for replication", ke);
                }
            }
            for (SortedSet sortedSet : this.hlogsById.values()) {
                if (this.sources.isEmpty()) {
                    sortedSet.clear();
                }
                sortedSet.add(name);
            }
        }
        this.latestPath = newLog;
    }

    void postLogRoll(Path newLog) throws IOException {
        for (ReplicationSourceInterface source : this.sources) {
            source.enqueueLog(newLog);
        }
    }

    public ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs, ReplicationSourceManager manager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Stoppable stopper, String peerId, UUID clusterId) throws IOException {
        ReplicationSourceInterface src;
        try {
            Class<?> c = Class.forName(conf.get("replication.replicationsource.implementation", ReplicationSource.class.getCanonicalName()));
            src = (ReplicationSourceInterface)c.newInstance();
        }
        catch (Exception e) {
            LOG.warn((Object)"Passed replication source implementation throws errors, defaulting to ReplicationSource", (Throwable)e);
            src = new ReplicationSource();
        }
        src.init(conf, fs, manager, replicationQueues, replicationPeers, stopper, peerId, clusterId);
        return src;
    }

    private void transferQueues(String rsZnode) {
        NodeFailoverWorker transfer = new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers, this.clusterId);
        try {
            this.executor.execute(transfer);
        }
        catch (RejectedExecutionException ex) {
            LOG.info((Object)("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage()));
        }
    }

    public void closeRecoveredQueue(ReplicationSourceInterface src) {
        LOG.info((Object)("Done with the recovered queue " + src.getPeerClusterZnode()));
        this.oldsources.remove(src);
        this.deleteSource(src.getPeerClusterZnode(), false);
    }

    public void removePeer(String id) {
        LOG.info((Object)("Closing the following queue " + id + ", currently have " + this.sources.size() + " and another " + this.oldsources.size() + " that were recovered"));
        String terminateMessage = "Replication stream was removed by a user";
        ReplicationSourceInterface srcToRemove = null;
        ArrayList<ReplicationSourceInterface> oldSourcesToDelete = new ArrayList<ReplicationSourceInterface>();
        for (ReplicationSourceInterface src : this.oldsources) {
            if (!id.equals(src.getPeerClusterId())) continue;
            oldSourcesToDelete.add(src);
        }
        for (ReplicationSourceInterface src : oldSourcesToDelete) {
            src.terminate(terminateMessage);
            this.closeRecoveredQueue(src);
        }
        LOG.info((Object)("Number of deleted recovered sources for " + id + ": " + oldSourcesToDelete.size()));
        for (ReplicationSourceInterface src : this.sources) {
            if (!id.equals(src.getPeerClusterId())) continue;
            srcToRemove = src;
            break;
        }
        if (srcToRemove == null) {
            LOG.error((Object)("The queue we wanted to close is missing " + id));
            return;
        }
        srcToRemove.terminate(terminateMessage);
        this.sources.remove(srcToRemove);
        this.deleteSource(id, true);
    }

    public void regionServerRemoved(String regionserver) {
        this.transferQueues(regionserver);
    }

    public void peerRemoved(String peerId) {
        this.removePeer(peerId);
    }

    public void peerListChanged(List<String> peerIds) {
        for (String id : peerIds) {
            try {
                boolean added = this.replicationPeers.connectToPeer(id);
                if (!added) continue;
                this.addSource(id);
            }
            catch (IOException e) {
                LOG.error((Object)"Error while adding a new peer", (Throwable)e);
            }
            catch (KeeperException e) {
                LOG.error((Object)"Error while adding a new peer", (Throwable)e);
            }
        }
    }

    public Path getOldLogDir() {
        return this.oldLogDir;
    }

    public Path getLogDir() {
        return this.logDir;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public String getStats() {
        StringBuffer stats = new StringBuffer();
        for (ReplicationSourceInterface source : this.sources) {
            stats.append("Normal source for cluster " + source.getPeerClusterId() + ": ");
            stats.append(source.getStats() + "\n");
        }
        for (ReplicationSourceInterface oldSource : this.oldsources) {
            stats.append("Recovered source for cluster/machine(s) " + oldSource.getPeerClusterId() + ": ");
            stats.append(oldSource.getStats() + "\n");
        }
        return stats.toString();
    }

    class NodeFailoverWorker
    extends Thread {
        private String rsZnode;
        private final ReplicationQueues rq;
        private final ReplicationPeers rp;
        private final UUID clusterId;

        public NodeFailoverWorker(String rsZnode, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, UUID clusterId) {
            super("Failover-for-" + rsZnode);
            this.rsZnode = rsZnode;
            this.rq = replicationQueues;
            this.rp = replicationPeers;
            this.clusterId = clusterId;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(ReplicationSourceManager.this.sleepBeforeFailover + (long)(ReplicationSourceManager.this.rand.nextFloat() * (float)ReplicationSourceManager.this.sleepBeforeFailover));
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"Interrupted while waiting before transferring a queue.");
                Thread.currentThread().interrupt();
            }
            if (ReplicationSourceManager.this.stopper.isStopped()) {
                LOG.info((Object)"Not transferring queue since we are shutting down");
                return;
            }
            SortedMap newQueues = null;
            newQueues = this.rq.claimQueues(this.rsZnode);
            if (newQueues.isEmpty()) {
                return;
            }
            for (Map.Entry entry : newQueues.entrySet()) {
                String peerId = (String)entry.getKey();
                try {
                    ReplicationSourceInterface src = ReplicationSourceManager.this.getReplicationSource(ReplicationSourceManager.this.conf, ReplicationSourceManager.this.fs, ReplicationSourceManager.this, this.rq, this.rp, ReplicationSourceManager.this.stopper, peerId, this.clusterId);
                    if (!this.rp.getConnectedPeers().contains(src.getPeerClusterId())) {
                        src.terminate("Recovered queue doesn't belong to any current peer");
                        break;
                    }
                    ReplicationSourceManager.this.oldsources.add(src);
                    for (String hlog : (SortedSet)entry.getValue()) {
                        src.enqueueLog(new Path(ReplicationSourceManager.this.oldLogDir, hlog));
                    }
                    src.startup();
                }
                catch (IOException e) {
                    LOG.error((Object)"Failed creating a source", (Throwable)e);
                }
            }
        }
    }
}

