package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/Replication.class */
public class Replication implements ReplicationSourceService, ReplicationSinkService {
    private static final Logger LOG = LoggerFactory.getLogger(Replication.class);
    private boolean isReplicationForBulkLoadDataEnabled;
    private ReplicationSourceManager replicationManager;
    private ReplicationQueueStorage queueStorage;
    private ReplicationPeers replicationPeers;
    private ReplicationTracker replicationTracker;
    private Configuration conf;
    private ReplicationSink replicationSink;
    private Server server;
    private ScheduledExecutorService scheduleThreadPool;
    private int statsThreadPeriod;
    private ReplicationLoad replicationLoad;
    private MetricsReplicationGlobalSourceSource globalMetricsSource;
    private PeerProcedureHandler peerProcedureHandler;

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/Replication$ReplicationStatisticsTask.class */
    private static final class ReplicationStatisticsTask implements Runnable {
        private final ReplicationSink replicationSink;
        private final ReplicationSourceManager replicationManager;

        public ReplicationStatisticsTask(ReplicationSink replicationSink, ReplicationSourceManager replicationSourceManager) {
            this.replicationManager = replicationSourceManager;
            this.replicationSink = replicationSink;
        }

        @Override // java.lang.Runnable
        public void run() {
            printStats(this.replicationManager.getStats());
            printStats(this.replicationSink.getStats());
        }

        private void printStats(String str) {
            if (str.isEmpty()) {
                return;
            }
            Replication.LOG.info(str);
        }
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationService
    public void initialize(Server server, FileSystem fileSystem, Path path, Path path2, WALProvider wALProvider) throws IOException {
        this.server = server;
        this.conf = this.server.getConfiguration();
        this.isReplicationForBulkLoadDataEnabled = ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
        this.scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d").setDaemon(true).build());
        if (this.isReplicationForBulkLoadDataEnabled && (this.conf.get(HConstants.REPLICATION_CLUSTER_ID) == null || this.conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty())) {
            throw new IllegalArgumentException("hbase.replication.cluster.id cannot be null/empty when hbase.replication.bulkload.enabled is set to true.");
        }
        try {
            this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), this.conf);
            this.replicationPeers = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
            this.replicationPeers.init();
            this.replicationTracker = ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server);
            try {
                UUID uUIDForCluster = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
                this.globalMetricsSource = ((MetricsReplicationSourceFactory) CompatibilitySingletonFactory.getInstance(MetricsReplicationSourceFactory.class)).getGlobalSource();
                this.replicationManager = new ReplicationSourceManager(this.queueStorage, this.replicationPeers, this.replicationTracker, this.conf, this.server, fileSystem, path, path2, uUIDForCluster, wALProvider != null ? wALProvider.getWALFileLengthProvider() : path3 -> {
                    return OptionalLong.empty();
                }, this.globalMetricsSource);
                if (wALProvider != null) {
                    wALProvider.addWALActionsListener(new ReplicationSourceWALActionListener(this.conf, this.replicationManager));
                }
                this.statsThreadPeriod = this.conf.getInt("replication.stats.thread.period.seconds", 300);
                LOG.debug("Replication stats-in-log period={} seconds", Integer.valueOf(this.statsThreadPeriod));
                this.replicationLoad = new ReplicationLoad();
                this.peerProcedureHandler = new PeerProcedureHandlerImpl(this.replicationManager);
            } catch (KeeperException e) {
                throw new IOException("Could not read cluster id", e);
            }
        } catch (Exception e2) {
            throw new IOException("Failed replication handler create", e2);
        }
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationSourceService
    public PeerProcedureHandler getPeerProcedureHandler() {
        return this.peerProcedureHandler;
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationService
    public void stopReplicationService() {
        join();
    }

    public void join() {
        this.replicationManager.join();
        if (this.replicationSink != null) {
            this.replicationSink.stopReplicationSinkServices();
        }
        this.scheduleThreadPool.shutdown();
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationSinkService
    public void replicateLogEntries(List<AdminProtos.WALEntry> list, CellScanner cellScanner, String str, String str2, String str3) throws IOException {
        this.replicationSink.replicateEntries(list, cellScanner, str, str2, str3);
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationService
    public void startReplicationService() throws IOException {
        this.replicationManager.init();
        this.replicationSink = new ReplicationSink(this.conf);
        this.scheduleThreadPool.scheduleAtFixedRate(new ReplicationStatisticsTask(this.replicationSink, this.replicationManager), this.statsThreadPeriod, this.statsThreadPeriod, TimeUnit.SECONDS);
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationSourceService
    public ReplicationSourceManager getReplicationManager() {
        return this.replicationManager;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addHFileRefsToQueue(TableName tableName, byte[] bArr, List<Pair<Path, Path>> list) throws IOException {
        try {
            this.replicationManager.addHFileRefs(tableName, bArr, list);
        } catch (IOException e) {
            LOG.error("Failed to add hfile references in the replication queue.", e);
            throw e;
        }
    }

    @Override // org.apache.hadoop.hbase.regionserver.ReplicationService
    public ReplicationLoad refreshAndGetReplicationLoad() {
        if (this.replicationLoad == null) {
            return null;
        }
        buildReplicationLoad();
        return this.replicationLoad;
    }

    private void buildReplicationLoad() {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.replicationManager.getSources());
        arrayList.addAll(this.replicationManager.getOldSources());
        this.replicationLoad.buildReplicationLoad(arrayList, this.replicationSink.getSinkMetrics());
    }
}
