package org.apache.hadoop.hdfs.server.datanode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeStatus;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPServiceActor.class */
public class BPServiceActor implements Runnable {
    static final Logger LOG;
    final InetSocketAddress nnAddr;
    HAServiceProtocol.HAServiceState state;
    final BPOfferService bpos;
    private final Scheduler scheduler;
    Thread bpThread;
    DatanodeProtocolClientSideTranslatorPB bpNamenode;
    private String serviceId;
    private String nnId;
    private final DataNode dn;
    private final DNConf dnConf;
    private long prevBlockReportId;
    private long fullBlockReportLeaseId;
    private final int maxDataLength;
    private final IncrementalBlockReportManager ibrManager;
    private DatanodeRegistration bpRegistration;
    private final CommandProcessingThread commandProcessingThread;
    private final CountDownLatch initialRegistrationComplete;
    private final LifelineSender lifelineSender;
    static final /* synthetic */ boolean $assertionsDisabled;
    volatile long lastCacheReport = 0;
    private volatile RunningState runningState = RunningState.CONNECTING;
    private volatile boolean shouldServiceRun = true;
    private final SortedSet<Integer> blockReportSizes = Collections.synchronizedSortedSet(new TreeSet());
    final LinkedList<BPServiceActorAction> bpThreadQueue = new LinkedList<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$CommandProcessingThread.class */
    public class CommandProcessingThread extends Thread {
        private final BPServiceActor actor;
        private final BlockingQueue<Runnable> queue;

        CommandProcessingThread(BPServiceActor bPServiceActor) {
            super("Command processor");
            this.actor = bPServiceActor;
            this.queue = new LinkedBlockingQueue();
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    processQueue();
                    BPServiceActor.LOG.warn("Ending command processor service for: " + this);
                    BPServiceActor.this.shouldServiceRun = false;
                } catch (Throwable th) {
                    BPServiceActor.LOG.error("{} encountered fatal exception and exit.", getName(), th);
                    BPServiceActor.this.runningState = RunningState.FAILED;
                    BPServiceActor.LOG.warn("Ending command processor service for: " + this);
                    BPServiceActor.this.shouldServiceRun = false;
                }
            } catch (Throwable th2) {
                BPServiceActor.LOG.warn("Ending command processor service for: " + this);
                BPServiceActor.this.shouldServiceRun = false;
                throw th2;
            }
        }

        private void processQueue() {
            while (BPServiceActor.this.shouldRun()) {
                try {
                    this.queue.take().run();
                    BPServiceActor.this.dn.getMetrics().incrActorCmdQueueLength(-1);
                    BPServiceActor.this.dn.getMetrics().incrNumProcessedCommands();
                } catch (InterruptedException e) {
                    BPServiceActor.LOG.error("{} encountered interrupt and exit.", getName());
                    Thread.currentThread().interrupt();
                    if (Thread.interrupted()) {
                        break;
                    }
                }
            }
            BPServiceActor.this.dn.getMetrics().incrActorCmdQueueLength((-1) * this.queue.size());
            this.queue.clear();
        }

        private boolean processCommand(DatanodeCommand[] datanodeCommandArr) {
            if (datanodeCommandArr == null) {
                return true;
            }
            long monotonicNow = Time.monotonicNow();
            for (DatanodeCommand datanodeCommand : datanodeCommandArr) {
                try {
                } catch (IOException e) {
                    BPServiceActor.LOG.warn("Error processing datanode Command", e);
                } catch (RemoteException e2) {
                    String className = e2.getClassName();
                    if (UnregisteredNodeException.class.getName().equals(className) || DisallowedDatanodeException.class.getName().equals(className) || IncorrectVersionException.class.getName().equals(className)) {
                        BPServiceActor.LOG.warn("{} is shutting down", this, e2);
                        BPServiceActor.this.shouldServiceRun = false;
                        return false;
                    }
                }
                if (!BPServiceActor.this.bpos.processCommandFromActor(datanodeCommand, this.actor)) {
                    return false;
                }
            }
            long monotonicNow2 = Time.monotonicNow() - monotonicNow;
            if (datanodeCommandArr.length > 0) {
                BPServiceActor.this.dn.getMetrics().addNumProcessedCommands(monotonicNow2);
            }
            if (monotonicNow2 <= BPServiceActor.this.dnConf.getProcessCommandsThresholdMs()) {
                return true;
            }
            BPServiceActor.LOG.info("Took {} ms to process {} commands from NN", Long.valueOf(monotonicNow2), Integer.valueOf(datanodeCommandArr.length));
            return true;
        }

        void enqueue(DatanodeCommand datanodeCommand) throws InterruptedException {
            if (datanodeCommand == null) {
                return;
            }
            this.queue.put(() -> {
                processCommand(new DatanodeCommand[]{datanodeCommand});
            });
            BPServiceActor.this.dn.getMetrics().incrActorCmdQueueLength(1);
        }

        void enqueue(List<DatanodeCommand> list) throws InterruptedException {
            if (list == null) {
                return;
            }
            this.queue.put(() -> {
                processCommand((DatanodeCommand[]) list.toArray(new DatanodeCommand[list.size()]));
            });
            BPServiceActor.this.dn.getMetrics().incrActorCmdQueueLength(1);
        }

        void enqueue(DatanodeCommand[] datanodeCommandArr) throws InterruptedException {
            this.queue.put(() -> {
                processCommand(datanodeCommandArr);
            });
            BPServiceActor.this.dn.getMetrics().incrActorCmdQueueLength(1);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$LifelineSender.class */
    public final class LifelineSender implements Runnable, Closeable {
        private final InetSocketAddress lifelineNnAddr;
        private Thread lifelineThread;
        private DatanodeLifelineProtocolClientSideTranslatorPB lifelineNamenode;

        public LifelineSender(InetSocketAddress inetSocketAddress) {
            this.lifelineNnAddr = inetSocketAddress;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            stop();
            try {
                join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            IOUtils.cleanup((Log) null, new Closeable[]{this.lifelineNamenode});
        }

        @Override // java.lang.Runnable
        public void run() {
            while (BPServiceActor.this.shouldRun()) {
                try {
                    BPServiceActor.this.initialRegistrationComplete.await();
                    break;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            while (BPServiceActor.this.shouldRun()) {
                try {
                    if (this.lifelineNamenode == null) {
                        this.lifelineNamenode = BPServiceActor.this.dn.connectToLifelineNN(this.lifelineNnAddr);
                    }
                    sendLifelineIfDue();
                    Thread.sleep(BPServiceActor.this.scheduler.getLifelineWaitTime());
                } catch (IOException e2) {
                    BPServiceActor.LOG.warn("IOException in LifelineSender for " + BPServiceActor.this, e2);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                }
            }
            BPServiceActor.LOG.info("LifelineSender for " + BPServiceActor.this + " exiting.");
        }

        public void start() {
            this.lifelineThread = new Thread(this, BPServiceActor.this.formatThreadName("lifeline", this.lifelineNnAddr));
            this.lifelineThread.setDaemon(true);
            this.lifelineThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.hadoop.hdfs.server.datanode.BPServiceActor.LifelineSender.1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread, Throwable th) {
                    BPServiceActor.LOG.error(thread + " terminating on unexpected exception", th);
                }
            });
            this.lifelineThread.start();
        }

        public void stop() {
            if (this.lifelineThread != null) {
                this.lifelineThread.interrupt();
            }
        }

        public void join() throws InterruptedException {
            if (this.lifelineThread != null) {
                this.lifelineThread.join();
            }
        }

        private void sendLifelineIfDue() throws IOException {
            long monotonicNow = BPServiceActor.this.scheduler.monotonicNow();
            if (!BPServiceActor.this.scheduler.isLifelineDue(monotonicNow)) {
                if (BPServiceActor.LOG.isDebugEnabled()) {
                    BPServiceActor.LOG.debug("Skipping sending lifeline for " + BPServiceActor.this + ", because it is not due.");
                }
            } else if (BPServiceActor.this.dn.areHeartbeatsDisabledForTests()) {
                if (BPServiceActor.LOG.isDebugEnabled()) {
                    BPServiceActor.LOG.debug("Skipping sending lifeline for " + BPServiceActor.this + ", because heartbeats are disabled for tests.");
                }
            } else {
                sendLifeline();
                BPServiceActor.this.dn.getMetrics().addLifeline(BPServiceActor.this.scheduler.monotonicNow() - monotonicNow, BPServiceActor.this.getRpcMetricSuffix());
                BPServiceActor.this.scheduler.scheduleNextLifeline(BPServiceActor.this.scheduler.monotonicNow());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void sendLifeline() throws IOException {
            StorageReport[] storageReports = BPServiceActor.this.dn.getFSDataset().getStorageReports(BPServiceActor.this.bpos.getBlockPoolId());
            if (BPServiceActor.LOG.isDebugEnabled()) {
                BPServiceActor.LOG.debug("Sending lifeline with " + storageReports.length + " storage  reports from service actor: " + BPServiceActor.this);
            }
            VolumeFailureSummary volumeFailureSummary = BPServiceActor.this.dn.getFSDataset().getVolumeFailureSummary();
            this.lifelineNamenode.sendLifeline(BPServiceActor.this.bpRegistration, storageReports, BPServiceActor.this.dn.getFSDataset().getCacheCapacity(), BPServiceActor.this.dn.getFSDataset().getCacheUsed(), BPServiceActor.this.dn.getXmitsInProgress(), BPServiceActor.this.dn.getXceiverCount(), volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0, volumeFailureSummary);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$RunningState.class */
    public enum RunningState {
        CONNECTING,
        INIT_FAILED,
        RUNNING,
        EXITED,
        FAILED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hdfs/server/datanode/BPServiceActor$Scheduler.class */
    public static class Scheduler {

        @VisibleForTesting
        volatile long nextLifelineTime;
        private final long heartbeatIntervalMs;
        private final long lifelineIntervalMs;
        private final long blockReportIntervalMs;
        private final long outliersReportIntervalMs;

        @VisibleForTesting
        volatile long nextBlockReportTime = monotonicNow();

        @VisibleForTesting
        volatile long nextHeartbeatTime = monotonicNow();

        @VisibleForTesting
        volatile long lastBlockReportTime = monotonicNow();

        @VisibleForTesting
        volatile long lastHeartbeatTime = monotonicNow();

        @VisibleForTesting
        boolean resetBlockReportTime = true;

        @VisibleForTesting
        volatile long nextOutliersReportTime = monotonicNow();
        private final AtomicBoolean forceFullBlockReport = new AtomicBoolean(false);

        Scheduler(long j, long j2, long j3, long j4) {
            this.heartbeatIntervalMs = j;
            this.lifelineIntervalMs = j2;
            this.blockReportIntervalMs = j3;
            this.outliersReportIntervalMs = j4;
            scheduleNextLifeline(this.nextHeartbeatTime);
        }

        long scheduleHeartbeat() {
            this.nextHeartbeatTime = monotonicNow();
            scheduleNextLifeline(this.nextHeartbeatTime);
            return this.nextHeartbeatTime;
        }

        long scheduleNextHeartbeat() {
            this.nextHeartbeatTime = monotonicNow() + this.heartbeatIntervalMs;
            scheduleNextLifeline(this.nextHeartbeatTime);
            return this.nextHeartbeatTime;
        }

        void updateLastHeartbeatTime(long j) {
            this.lastHeartbeatTime = j;
        }

        void updateLastBlockReportTime(long j) {
            this.lastBlockReportTime = j;
        }

        void scheduleNextOutlierReport() {
            this.nextOutliersReportTime = monotonicNow() + this.outliersReportIntervalMs;
        }

        long getLastHearbeatTime() {
            return (monotonicNow() - this.lastHeartbeatTime) / 1000;
        }

        long getLastBlockReportTime() {
            return (monotonicNow() - this.lastBlockReportTime) / 1000;
        }

        long scheduleNextLifeline(long j) {
            this.nextLifelineTime = j + this.lifelineIntervalMs;
            return this.nextLifelineTime;
        }

        boolean isHeartbeatDue(long j) {
            return this.nextHeartbeatTime - j <= 0;
        }

        boolean isLifelineDue(long j) {
            return this.nextLifelineTime - j <= 0;
        }

        boolean isBlockReportDue(long j) {
            return this.nextBlockReportTime - j <= 0;
        }

        boolean isOutliersReportDue(long j) {
            return this.nextOutliersReportTime - j <= 0;
        }

        void forceFullBlockReportNow() {
            this.forceFullBlockReport.set(true);
            this.resetBlockReportTime = true;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public long scheduleBlockReport(long j) {
            if (j > 0) {
                this.nextBlockReportTime = monotonicNow() + ThreadLocalRandom.current().nextInt((int) j);
            } else {
                this.nextBlockReportTime = monotonicNow();
            }
            this.resetBlockReportTime = true;
            return this.nextBlockReportTime;
        }

        void scheduleNextBlockReport() {
            if (!this.resetBlockReportTime) {
                this.nextBlockReportTime += (((monotonicNow() - this.nextBlockReportTime) + this.blockReportIntervalMs) / this.blockReportIntervalMs) * this.blockReportIntervalMs;
            } else {
                this.nextBlockReportTime = monotonicNow() + ThreadLocalRandom.current().nextInt((int) this.blockReportIntervalMs);
                this.resetBlockReportTime = false;
            }
        }

        long getHeartbeatWaitTime() {
            return this.nextHeartbeatTime - monotonicNow();
        }

        long getLifelineWaitTime() {
            return this.nextLifelineTime - monotonicNow();
        }

        @VisibleForTesting
        public long monotonicNow() {
            return Time.monotonicNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BPServiceActor(String str, String str2, InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, BPOfferService bPOfferService) {
        this.serviceId = null;
        this.nnId = null;
        this.bpos = bPOfferService;
        this.dn = bPOfferService.getDataNode();
        this.nnAddr = inetSocketAddress;
        this.lifelineSender = inetSocketAddress2 != null ? new LifelineSender(inetSocketAddress2) : null;
        this.initialRegistrationComplete = inetSocketAddress2 != null ? new CountDownLatch(1) : null;
        this.dnConf = this.dn.getDnConf();
        this.ibrManager = new IncrementalBlockReportManager(this.dnConf.ibrInterval, this.dn.getMetrics());
        this.prevBlockReportId = ThreadLocalRandom.current().nextLong();
        this.fullBlockReportLeaseId = 0L;
        this.scheduler = new Scheduler(this.dnConf.heartBeatInterval, this.dnConf.getLifelineIntervalMs(), this.dnConf.blockReportInterval, this.dnConf.outliersReportIntervalMs);
        this.maxDataLength = this.dnConf.getMaxDataLength();
        if (str != null) {
            this.serviceId = str;
        }
        if (str2 != null) {
            this.nnId = str2;
        }
        this.commandProcessingThread = new CommandProcessingThread(this);
        this.commandProcessingThread.start();
    }

    public DatanodeRegistration getBpRegistration() {
        return this.bpRegistration;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IncrementalBlockReportManager getIbrManager() {
        return this.ibrManager;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isAlive() {
        if (this.shouldServiceRun && this.bpThread.isAlive()) {
            return this.runningState == RunningState.RUNNING || this.runningState == RunningState.CONNECTING;
        }
        return false;
    }

    String getRunningState() {
        return this.runningState.toString();
    }

    public String toString() {
        return this.bpos.toString() + " service to " + this.nnAddr;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InetSocketAddress getNNSocketAddress() {
        return this.nnAddr;
    }

    private String getNameNodeAddress() {
        return NetUtils.getHostPortString(getNNSocketAddress());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, String> getActorInfoMap() {
        HashMap hashMap = new HashMap();
        hashMap.put("NamenodeAddress", getNameNodeAddress());
        hashMap.put("BlockPoolID", this.bpos.getBlockPoolId());
        hashMap.put("ActorState", getRunningState());
        hashMap.put("LastHeartbeat", String.valueOf(getScheduler().getLastHearbeatTime()));
        hashMap.put("LastBlockReport", String.valueOf(getScheduler().getLastBlockReportTime()));
        hashMap.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize()));
        hashMap.put("maxDataLength", String.valueOf(this.maxDataLength));
        return hashMap;
    }

    @VisibleForTesting
    void setNameNode(DatanodeProtocolClientSideTranslatorPB datanodeProtocolClientSideTranslatorPB) {
        this.bpNamenode = datanodeProtocolClientSideTranslatorPB;
    }

    @VisibleForTesting
    DatanodeProtocolClientSideTranslatorPB getNameNodeProxy() {
        return this.bpNamenode;
    }

    @VisibleForTesting
    void setLifelineNameNode(DatanodeLifelineProtocolClientSideTranslatorPB datanodeLifelineProtocolClientSideTranslatorPB) {
        this.lifelineSender.lifelineNamenode = datanodeLifelineProtocolClientSideTranslatorPB;
    }

    @VisibleForTesting
    DatanodeLifelineProtocolClientSideTranslatorPB getLifelineNameNodeProxy() {
        return this.lifelineSender.lifelineNamenode;
    }

    @VisibleForTesting
    NamespaceInfo retrieveNamespaceInfo() throws IOException {
        NamespaceInfo namespaceInfo = null;
        while (shouldRun()) {
            try {
                namespaceInfo = this.bpNamenode.versionRequest();
                LOG.debug(this + " received versionRequest response: " + namespaceInfo);
                break;
            } catch (SocketTimeoutException e) {
                LOG.warn("Problem connecting to server: " + this.nnAddr);
                sleepAndLogInterrupts(5000, "requesting version info from NN");
            } catch (IOException e2) {
                LOG.warn("Problem connecting to server: " + this.nnAddr);
                sleepAndLogInterrupts(5000, "requesting version info from NN");
            }
        }
        if (namespaceInfo == null) {
            throw new IOException("DN shut down before block pool connected");
        }
        checkNNVersion(namespaceInfo);
        return namespaceInfo;
    }

    private void checkNNVersion(NamespaceInfo namespaceInfo) throws IncorrectVersionException {
        String softwareVersion = namespaceInfo.getSoftwareVersion();
        String minimumNameNodeVersion = this.dnConf.getMinimumNameNodeVersion();
        if (VersionUtil.compareVersions(softwareVersion, minimumNameNodeVersion) < 0) {
            IncorrectVersionException incorrectVersionException = new IncorrectVersionException(minimumNameNodeVersion, softwareVersion, "NameNode", "DataNode");
            LOG.warn(incorrectVersionException.getMessage());
            throw incorrectVersionException;
        }
        String version = VersionInfo.getVersion();
        if (softwareVersion.equals(version)) {
            return;
        }
        LOG.info("Reported NameNode version '" + softwareVersion + "' does not match DataNode version '" + version + "' but is within acceptable limits. Note: This is normal during a rolling upgrade.");
    }

    private void connectToNNAndHandshake() throws IOException {
        this.bpNamenode = this.dn.connectToNN(this.nnAddr);
        NamespaceInfo retrieveNamespaceInfo = retrieveNamespaceInfo();
        this.bpos.verifyAndSetNamespaceInfo(this, retrieveNamespaceInfo);
        this.bpThread.setName(formatThreadName("heartbeating", this.nnAddr));
        register(retrieveNamespaceInfo);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void triggerBlockReportForTests() {
        synchronized (this.ibrManager) {
            this.scheduler.scheduleHeartbeat();
            long j = this.scheduler.nextBlockReportTime;
            this.scheduler.forceFullBlockReportNow();
            this.ibrManager.notifyAll();
            while (j == this.scheduler.nextBlockReportTime) {
                try {
                    this.ibrManager.wait(100L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void triggerHeartbeatForTests() {
        synchronized (this.ibrManager) {
            long scheduleHeartbeat = this.scheduler.scheduleHeartbeat();
            this.ibrManager.notifyAll();
            while (scheduleHeartbeat - this.scheduler.nextHeartbeatTime >= 0) {
                try {
                    this.ibrManager.wait(100L);
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    private int getMaxBlockReportSize() {
        int i = 0;
        if (!this.blockReportSizes.isEmpty()) {
            i = this.blockReportSizes.last().intValue();
        }
        return i;
    }

    private long generateUniqueBlockReportId() {
        this.prevBlockReportId++;
        while (this.prevBlockReportId == 0) {
            this.prevBlockReportId = ThreadLocalRandom.current().nextLong();
        }
        return this.prevBlockReportId;
    }

    /* JADX WARN: Finally extract failed */
    List<DatanodeCommand> blockReport(long j) throws IOException {
        ArrayList arrayList = new ArrayList();
        this.ibrManager.sendIBRs(this.bpNamenode, this.bpRegistration, this.bpos.getBlockPoolId(), getRpcMetricSuffix());
        long monotonicNow = Time.monotonicNow();
        Map<DatanodeStorage, BlockListAsLongs> blockReports = this.dn.getFSDataset().getBlockReports(this.bpos.getBlockPoolId());
        int i = 0;
        int i2 = 0;
        StorageBlockReport[] storageBlockReportArr = new StorageBlockReport[blockReports.size()];
        for (Map.Entry<DatanodeStorage, BlockListAsLongs> entry : blockReports.entrySet()) {
            BlockListAsLongs value = entry.getValue();
            int i3 = i;
            i++;
            storageBlockReportArr[i3] = new StorageBlockReport(entry.getKey(), value);
            i2 += value.getNumberOfBlocks();
        }
        int i4 = 0;
        int i5 = 0;
        boolean z = false;
        long monotonicNow2 = Time.monotonicNow();
        long generateUniqueBlockReportId = generateUniqueBlockReportId();
        boolean isCapabilitySupported = this.bpRegistration.getNamespaceInfo().isCapabilitySupported(NamespaceInfo.Capability.STORAGE_BLOCK_REPORT_BUFFERS);
        this.blockReportSizes.clear();
        try {
            if (i2 < this.dnConf.blockReportSplitThreshold) {
                DatanodeCommand blockReport = this.bpNamenode.blockReport(this.bpRegistration, this.bpos.getBlockPoolId(), storageBlockReportArr, new BlockReportContext(1, 0, generateUniqueBlockReportId, j, true));
                this.blockReportSizes.add(Integer.valueOf(calculateBlockReportPBSize(isCapabilitySupported, storageBlockReportArr)));
                i5 = 1;
                i4 = storageBlockReportArr.length;
                if (blockReport != null) {
                    arrayList.add(blockReport);
                }
            } else {
                for (int i6 = 0; i6 < storageBlockReportArr.length; i6++) {
                    StorageBlockReport[] storageBlockReportArr2 = {storageBlockReportArr[i6]};
                    DatanodeCommand blockReport2 = this.bpNamenode.blockReport(this.bpRegistration, this.bpos.getBlockPoolId(), storageBlockReportArr2, new BlockReportContext(storageBlockReportArr.length, i6, generateUniqueBlockReportId, j, true));
                    this.blockReportSizes.add(Integer.valueOf(calculateBlockReportPBSize(isCapabilitySupported, storageBlockReportArr2)));
                    i4++;
                    i5++;
                    if (blockReport2 != null) {
                        arrayList.add(blockReport2);
                    }
                }
            }
            z = true;
            long monotonicNow3 = Time.monotonicNow() - monotonicNow2;
            long j2 = monotonicNow2 - monotonicNow;
            this.dn.getMetrics().addBlockReport(monotonicNow3, getRpcMetricSuffix());
            int size = arrayList.size();
            LOG.info((1 != 0 ? "S" : "Uns") + "uccessfully sent block report 0x" + Long.toHexString(generateUniqueBlockReportId) + ",  containing " + storageBlockReportArr.length + " storage report(s), of which we sent " + i4 + ". The reports had " + i2 + " total blocks and used " + i5 + " RPC(s). This took " + j2 + " msec to generate and " + monotonicNow3 + " msecs for RPC and NN processing. Got back " + (size == 0 ? "no commands" : size == 1 ? "one command: " + arrayList.get(0) : size + " commands: " + Joiner.on("; ").join(arrayList)) + ".");
            this.scheduler.updateLastBlockReportTime(Time.monotonicNow());
            this.scheduler.scheduleNextBlockReport();
            if (arrayList.size() == 0) {
                return null;
            }
            return arrayList;
        } catch (Throwable th) {
            long monotonicNow4 = Time.monotonicNow() - monotonicNow2;
            long j3 = monotonicNow2 - monotonicNow;
            this.dn.getMetrics().addBlockReport(monotonicNow4, getRpcMetricSuffix());
            int size2 = arrayList.size();
            LOG.info((z ? "S" : "Uns") + "uccessfully sent block report 0x" + Long.toHexString(generateUniqueBlockReportId) + ",  containing " + storageBlockReportArr.length + " storage report(s), of which we sent " + i4 + ". The reports had " + i2 + " total blocks and used " + i5 + " RPC(s). This took " + j3 + " msec to generate and " + monotonicNow4 + " msecs for RPC and NN processing. Got back " + (size2 == 0 ? "no commands" : size2 == 1 ? "one command: " + arrayList.get(0) : size2 + " commands: " + Joiner.on("; ").join(arrayList)) + ".");
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getRpcMetricSuffix() {
        if (this.serviceId == null && this.nnId == null) {
            return null;
        }
        return (this.serviceId != null || this.nnId == null) ? (this.serviceId == null || this.nnId != null) ? this.serviceId + "-" + this.nnId : this.serviceId : this.nnId;
    }

    DatanodeCommand cacheReport() throws IOException {
        if (this.dn.getFSDataset().getCacheCapacity() == 0) {
            return null;
        }
        DatanodeCommand datanodeCommand = null;
        long monotonicNow = Time.monotonicNow();
        if (monotonicNow - this.lastCacheReport > this.dnConf.cacheReportInterval) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending cacheReport from service actor: " + this);
            }
            this.lastCacheReport = monotonicNow;
            String blockPoolId = this.bpos.getBlockPoolId();
            List<Long> cacheReport = this.dn.getFSDataset().getCacheReport(blockPoolId);
            long monotonicNow2 = Time.monotonicNow();
            datanodeCommand = this.bpNamenode.cacheReport(this.bpRegistration, blockPoolId, cacheReport);
            long j = monotonicNow2 - monotonicNow;
            long monotonicNow3 = Time.monotonicNow() - monotonicNow2;
            this.dn.getMetrics().addCacheReport(monotonicNow3);
            if (LOG.isDebugEnabled()) {
                LOG.debug("CacheReport of " + cacheReport.size() + " block(s) took " + j + " msec to generate and " + monotonicNow3 + " msecs for RPC and NN processing");
            }
        }
        return datanodeCommand;
    }

    private int calculateBlockReportPBSize(boolean z, StorageBlockReport[] storageBlockReportArr) {
        int i;
        int length;
        int i2 = 0;
        for (StorageBlockReport storageBlockReport : storageBlockReportArr) {
            if (z) {
                i = i2;
                length = storageBlockReport.getBlocks().getBlocksBuffer().size();
            } else {
                i = i2;
                length = 10 * storageBlockReport.getBlocks().getBlockListAsLongs().length;
            }
            i2 = i + length;
        }
        return i2;
    }

    HeartbeatResponse sendHeartBeat(boolean z) throws IOException {
        this.scheduler.scheduleNextHeartbeat();
        StorageReport[] storageReports = this.dn.getFSDataset().getStorageReports(this.bpos.getBlockPoolId());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Sending heartbeat with " + storageReports.length + " storage reports from service actor: " + this);
        }
        long monotonicNow = Time.monotonicNow();
        this.scheduler.updateLastHeartbeatTime(monotonicNow);
        VolumeFailureSummary volumeFailureSummary = this.dn.getFSDataset().getVolumeFailureSummary();
        int length = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0;
        boolean isOutliersReportDue = this.scheduler.isOutliersReportDue(monotonicNow);
        HeartbeatResponse sendHeartbeat = this.bpNamenode.sendHeartbeat(this.bpRegistration, storageReports, this.dn.getFSDataset().getCacheCapacity(), this.dn.getFSDataset().getCacheUsed(), this.dn.getXmitsInProgress(), this.dn.getActiveTransferThreadCount(), length, volumeFailureSummary, z, (!isOutliersReportDue || this.dn.getPeerMetrics() == null) ? SlowPeerReports.EMPTY_REPORT : SlowPeerReports.create(this.dn.getPeerMetrics().getOutliers()), (!isOutliersReportDue || this.dn.getDiskMetrics() == null) ? SlowDiskReports.EMPTY_REPORT : SlowDiskReports.create(this.dn.getDiskMetrics().getDiskOutliersStats()));
        if (isOutliersReportDue) {
            this.scheduler.scheduleNextOutlierReport();
        }
        return sendHeartbeat;
    }

    @VisibleForTesting
    void sendLifelineForTests() throws IOException {
        this.lifelineSender.sendLifeline();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        if (this.bpThread == null || !this.bpThread.isAlive()) {
            this.bpThread = new Thread(this);
            this.bpThread.setDaemon(true);
            if (this.lifelineSender != null) {
                this.lifelineSender.start();
            }
            this.bpThread.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String formatThreadName(String str, InetSocketAddress inetSocketAddress) {
        String blockPoolId = this.bpos.getBlockPoolId(true);
        return (blockPoolId != null ? blockPoolId : this.bpos.getNameserviceId()) + " " + str + " to " + inetSocketAddress;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        this.shouldServiceRun = false;
        if (this.lifelineSender != null) {
            this.lifelineSender.stop();
        }
        if (this.bpThread != null) {
            this.bpThread.interrupt();
        }
        if (this.commandProcessingThread != null) {
            this.commandProcessingThread.interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void join() {
        try {
            if (this.lifelineSender != null) {
                this.lifelineSender.join();
            }
            if (this.bpThread != null) {
                this.bpThread.join();
            }
        } catch (InterruptedException e) {
        }
    }

    private synchronized void cleanUp() {
        this.shouldServiceRun = false;
        IOUtils.cleanup((Log) null, new Closeable[]{this.bpNamenode});
        IOUtils.cleanup((Log) null, new Closeable[]{this.lifelineSender});
        this.bpos.shutdownActor(this);
    }

    private void handleRollingUpgradeStatus(HeartbeatResponse heartbeatResponse) throws IOException {
        RollingUpgradeStatus rollingUpdateStatus = heartbeatResponse.getRollingUpdateStatus();
        if (rollingUpdateStatus == null || rollingUpdateStatus.getBlockPoolId().compareTo(this.bpos.getBlockPoolId()) == 0) {
            this.bpos.signalRollingUpgrade(rollingUpdateStatus);
        } else {
            LOG.error("Invalid BlockPoolId " + rollingUpdateStatus.getBlockPoolId() + " in HeartbeatResponse. Expected " + this.bpos.getBlockPoolId());
        }
    }

    private void offerService() throws Exception {
        LOG.info("For namenode " + this.nnAddr + " using BLOCKREPORT_INTERVAL of " + this.dnConf.blockReportInterval + "msec CACHEREPORT_INTERVAL of " + this.dnConf.cacheReportInterval + "msec Initial delay: " + this.dnConf.initialBlockReportDelayMs + "msec; heartBeatInterval=" + this.dnConf.heartBeatInterval + (this.lifelineSender != null ? "; lifelineIntervalMs=" + this.dnConf.getLifelineIntervalMs() : ""));
        while (shouldRun()) {
            try {
                try {
                    DataNodeFaultInjector.get().startOfferService();
                    long monotonicNow = this.scheduler.monotonicNow();
                    boolean isHeartbeatDue = this.scheduler.isHeartbeatDue(monotonicNow);
                    if (isHeartbeatDue) {
                        boolean z = this.fullBlockReportLeaseId == 0 && this.scheduler.isBlockReportDue(monotonicNow);
                        if (!this.dn.areHeartbeatsDisabledForTests()) {
                            HeartbeatResponse sendHeartBeat = sendHeartBeat(z);
                            if (!$assertionsDisabled && sendHeartBeat == null) {
                                throw new AssertionError();
                                break;
                            }
                            if (sendHeartBeat.getFullBlockReportLeaseId() != 0) {
                                if (this.fullBlockReportLeaseId != 0) {
                                    LOG.warn(this.nnAddr + " sent back a full block report lease ID of 0x" + Long.toHexString(sendHeartBeat.getFullBlockReportLeaseId()) + ", but we already have a lease ID of 0x" + Long.toHexString(this.fullBlockReportLeaseId) + ". Overwriting old lease ID.");
                                }
                                this.fullBlockReportLeaseId = sendHeartBeat.getFullBlockReportLeaseId();
                            }
                            this.dn.getMetrics().addHeartbeat(this.scheduler.monotonicNow() - monotonicNow, getRpcMetricSuffix());
                            this.bpos.updateActorStatesFromHeartbeat(this, sendHeartBeat.getNameNodeHaState());
                            this.state = sendHeartBeat.getNameNodeHaState().getState();
                            if (this.state == HAServiceProtocol.HAServiceState.ACTIVE) {
                                handleRollingUpgradeStatus(sendHeartBeat);
                            }
                            this.commandProcessingThread.enqueue(sendHeartBeat.getCommands());
                        }
                    }
                    if (!this.dn.areIBRDisabledForTests() && (this.ibrManager.sendImmediately() || isHeartbeatDue)) {
                        this.ibrManager.sendIBRs(this.bpNamenode, this.bpRegistration, this.bpos.getBlockPoolId(), getRpcMetricSuffix());
                    }
                    List<DatanodeCommand> list = null;
                    boolean andSet = this.scheduler.forceFullBlockReport.getAndSet(false);
                    if (andSet) {
                        LOG.info("Forcing a full block report to " + this.nnAddr);
                    }
                    if (this.fullBlockReportLeaseId != 0 || andSet) {
                        list = blockReport(this.fullBlockReportLeaseId);
                        this.fullBlockReportLeaseId = 0L;
                    }
                    this.commandProcessingThread.enqueue(list);
                    if (!this.dn.areCacheReportsDisabledForTests()) {
                        this.commandProcessingThread.enqueue(cacheReport());
                    }
                    if (isHeartbeatDue) {
                        this.dn.getMetrics().addHeartbeatTotal(this.scheduler.monotonicNow() - monotonicNow, getRpcMetricSuffix());
                    }
                    this.ibrManager.waitTillNextIBR(this.scheduler.getHeartbeatWaitTime());
                    DataNodeFaultInjector.get().endOfferService();
                } catch (RemoteException e) {
                    String className = e.getClassName();
                    if (UnregisteredNodeException.class.getName().equals(className) || DisallowedDatanodeException.class.getName().equals(className) || IncorrectVersionException.class.getName().equals(className)) {
                        LOG.warn(this + " is shutting down", e);
                        this.shouldServiceRun = false;
                        DataNodeFaultInjector.get().endOfferService();
                        return;
                    } else {
                        LOG.warn("RemoteException in offerService", e);
                        sleepAfterException();
                        DataNodeFaultInjector.get().endOfferService();
                    }
                } catch (IOException e2) {
                    LOG.warn("IOException in offerService", e2);
                    sleepAfterException();
                    DataNodeFaultInjector.get().endOfferService();
                }
                processQueueMessages();
            } catch (Throwable th) {
                DataNodeFaultInjector.get().endOfferService();
                throw th;
            }
        }
    }

    private void sleepAfterException() {
        try {
            Thread.sleep(Math.min(1000L, this.dnConf.heartBeatInterval));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    void register(NamespaceInfo namespaceInfo) throws IOException {
        DatanodeRegistration createRegistration = this.bpos.createRegistration();
        LOG.info(this + " beginning handshake with NN");
        while (shouldRun()) {
            try {
                createRegistration = this.bpNamenode.registerDatanode(createRegistration);
                createRegistration.setNamespaceInfo(namespaceInfo);
                this.bpRegistration = createRegistration;
                break;
            } catch (EOFException e) {
                LOG.info("Problem connecting to server: " + this.nnAddr + " :" + e.getLocalizedMessage());
                sleepAndLogInterrupts(1000, "connecting to server");
            } catch (SocketTimeoutException e2) {
                LOG.info("Problem connecting to server: " + this.nnAddr);
                sleepAndLogInterrupts(1000, "connecting to server");
            } catch (IOException e3) {
                LOG.warn("Problem connecting to server: " + this.nnAddr);
                sleepAndLogInterrupts(1000, "connecting to server");
            } catch (RemoteException e4) {
                LOG.warn("RemoteException in register", e4);
                throw e4;
            }
        }
        LOG.info("Block pool " + this + " successfully registered with NN");
        this.bpos.registrationSucceeded(this, this.bpRegistration);
        this.fullBlockReportLeaseId = 0L;
        this.scheduler.scheduleBlockReport(this.dnConf.initialBlockReportDelayMs);
    }

    private void sleepAndLogInterrupts(int i, String str) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            LOG.info("BPOfferService " + this + " interrupted while " + str);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info(this + " starting to offer service");
        while (true) {
            try {
                try {
                    try {
                        connectToNNAndHandshake();
                        this.runningState = RunningState.RUNNING;
                        if (this.initialRegistrationComplete != null) {
                            this.initialRegistrationComplete.countDown();
                        }
                        while (shouldRun()) {
                            try {
                                offerService();
                            } catch (Exception e) {
                                LOG.error("Exception in BPOfferService for " + this, e);
                                sleepAndLogInterrupts(5000, "offering service");
                            }
                        }
                        this.runningState = RunningState.EXITED;
                        LOG.warn("Ending block pool service for: " + this);
                        cleanUp();
                        return;
                    } catch (Throwable th) {
                        LOG.warn("Unexpected exception in block pool " + this, th);
                        this.runningState = RunningState.FAILED;
                        LOG.warn("Ending block pool service for: " + this);
                        cleanUp();
                        return;
                    }
                } catch (Throwable th2) {
                    LOG.warn("Ending block pool service for: " + this);
                    cleanUp();
                    throw th2;
                }
            } catch (IOException e2) {
                this.runningState = RunningState.INIT_FAILED;
                if (!shouldRetryInit()) {
                    this.runningState = RunningState.FAILED;
                    LOG.error("Initialization failed for " + this + ". Exiting. ", e2);
                    LOG.warn("Ending block pool service for: " + this);
                    cleanUp();
                    return;
                }
                LOG.error("Initialization failed for " + this + " " + e2.getLocalizedMessage());
                sleepAndLogInterrupts(5000, "initializing");
            }
        }
    }

    private boolean shouldRetryInit() {
        return shouldRun() && this.bpos.shouldRetryInit();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean shouldRun() {
        return this.shouldServiceRun && this.dn.shouldRun();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reportRemoteBadBlock(DatanodeInfo datanodeInfo, ExtendedBlock extendedBlock) throws IOException {
        this.bpNamenode.reportBadBlocks(new LocatedBlock[]{new LocatedBlock(extendedBlock, new DatanodeInfo[]{datanodeInfo})});
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void reRegister() throws IOException {
        if (shouldRun()) {
            NamespaceInfo retrieveNamespaceInfo = retrieveNamespaceInfo();
            if (this.state == HAServiceProtocol.HAServiceState.STANDBY || this.state == HAServiceProtocol.HAServiceState.OBSERVER) {
                this.ibrManager.clearIBRs();
            }
            register(retrieveNamespaceInfo);
            this.scheduler.scheduleHeartbeat();
            DataNodeFaultInjector.get().blockUtilSendFullBlockReport();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void triggerBlockReport(BlockReportOptions blockReportOptions) {
        if (blockReportOptions.isIncremental()) {
            LOG.info(this.bpos.toString() + ": scheduling an incremental block report.");
            this.ibrManager.triggerIBR(true);
            return;
        }
        LOG.info(this.bpos.toString() + ": scheduling a full block report.");
        synchronized (this.ibrManager) {
            this.scheduler.forceFullBlockReportNow();
            this.ibrManager.notifyAll();
        }
    }

    public void bpThreadEnqueue(BPServiceActorAction bPServiceActorAction) {
        synchronized (this.bpThreadQueue) {
            if (!this.bpThreadQueue.contains(bPServiceActorAction)) {
                this.bpThreadQueue.add(bPServiceActorAction);
            }
        }
    }

    private void processQueueMessages() {
        LinkedList linkedList;
        synchronized (this.bpThreadQueue) {
            linkedList = new LinkedList(this.bpThreadQueue);
            this.bpThreadQueue.clear();
        }
        while (!linkedList.isEmpty()) {
            BPServiceActorAction bPServiceActorAction = (BPServiceActorAction) linkedList.remove();
            try {
                bPServiceActorAction.reportTo(this.bpNamenode, this.bpRegistration);
            } catch (BPServiceActorActionException e) {
                LOG.warn(e.getMessage() + this.nnAddr, e);
                bpThreadEnqueue(bPServiceActorAction);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Scheduler getScheduler() {
        return this.scheduler;
    }

    @VisibleForTesting
    void stopCommandProcessingThread() {
        if (this.commandProcessingThread != null) {
            this.commandProcessingThread.interrupt();
        }
    }

    static {
        $assertionsDisabled = !BPServiceActor.class.desiredAssertionStatus();
        LOG = DataNode.LOG;
    }
}
