package org.apache.hadoop.hbase.regionserver;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/SplitLogWorker.class */
public class SplitLogWorker extends ZooKeeperListener implements Runnable {
    private static final Log LOG;
    Thread worker;
    private final String serverName;
    private final TaskExecutor splitTaskExecutor;
    private long zkretries;
    private Object taskReadyLock;
    volatile int taskReadySeq;
    private volatile String currentTask;
    private int currentVersion;
    private volatile boolean exitWorker;
    private Object grabTaskLock;
    private boolean workerInGrabTask;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/SplitLogWorker$GetDataAsyncCallback.class */
    public class GetDataAsyncCallback implements AsyncCallback.DataCallback {
        private final Log LOG = LogFactory.getLog(GetDataAsyncCallback.class);

        GetDataAsyncCallback() {
        }

        public void processResult(int i, String str, Object obj, byte[] bArr, Stat stat) {
            ZKSplitLog.Counters.tot_wkr_get_data_result.incrementAndGet();
            if (i != 0) {
                this.LOG.warn("getdata rc = " + KeeperException.Code.get(i) + " " + str);
                SplitLogWorker.this.getDataSetWatchFailure(str);
            } else {
                SplitLogWorker.this.getDataSetWatchSuccess(str, SplitLogWorker.this.watcher.getRecoverableZooKeeper().removeMetaData(bArr));
            }
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/SplitLogWorker$TaskExecutor.class */
    public interface TaskExecutor {

        /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/SplitLogWorker$TaskExecutor$Status.class */
        public enum Status {
            DONE,
            ERR,
            RESIGNED,
            PREEMPTED
        }

        Status exec(String str, CancelableProgressable cancelableProgressable);
    }

    public SplitLogWorker(ZooKeeperWatcher zooKeeperWatcher, Configuration configuration, String str, TaskExecutor taskExecutor) {
        super(zooKeeperWatcher);
        this.taskReadyLock = new Object();
        this.taskReadySeq = 0;
        this.currentTask = null;
        this.grabTaskLock = new Object();
        this.workerInGrabTask = false;
        this.serverName = str;
        this.splitTaskExecutor = taskExecutor;
        this.zkretries = configuration.getLong("hbase.splitlog.zk.retries", 3L);
    }

    public SplitLogWorker(ZooKeeperWatcher zooKeeperWatcher, final Configuration configuration, String str) {
        this(zooKeeperWatcher, configuration, str, new TaskExecutor() { // from class: org.apache.hadoop.hbase.regionserver.SplitLogWorker.1
            @Override // org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor
            public TaskExecutor.Status exec(String str2, CancelableProgressable cancelableProgressable) {
                try {
                    Path rootDir = FSUtils.getRootDir(configuration);
                    FileSystem fileSystem = rootDir.getFileSystem(configuration);
                    try {
                        return !HLogSplitter.splitLogFile(rootDir, fileSystem.getFileStatus(new Path(str2)), fileSystem, configuration, cancelableProgressable) ? TaskExecutor.Status.PREEMPTED : TaskExecutor.Status.DONE;
                    } catch (InterruptedIOException e) {
                        SplitLogWorker.LOG.warn("log splitting of " + str2 + " interrupted, resigning", e);
                        return TaskExecutor.Status.RESIGNED;
                    } catch (IOException e2) {
                        if (e2.getCause() instanceof InterruptedException) {
                            SplitLogWorker.LOG.warn("log splitting of " + str2 + " interrupted, resigning", e2);
                            return TaskExecutor.Status.RESIGNED;
                        }
                        SplitLogWorker.LOG.warn("log splitting of " + str2 + " failed, returning error", e2);
                        return TaskExecutor.Status.ERR;
                    }
                } catch (IOException e3) {
                    SplitLogWorker.LOG.warn("could not find root dir or fs", e3);
                    return TaskExecutor.Status.RESIGNED;
                }
            }
        });
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                LOG.info("SplitLogWorker " + this.serverName + " starting");
                this.watcher.registerListener(this);
                int i = -1;
                while (i == -1) {
                    try {
                        i = ZKUtil.checkExists(this.watcher, this.watcher.splitLogZNode);
                    } catch (KeeperException e) {
                        LOG.warn("Exception when checking for " + this.watcher.splitLogZNode + " ... retrying", e);
                    }
                    if (i == -1) {
                        try {
                            LOG.info(this.watcher.splitLogZNode + " znode does not exist, waiting for master to create one");
                            Thread.sleep(1000L);
                        } catch (InterruptedException e2) {
                            LOG.debug("Interrupted while waiting for " + this.watcher.splitLogZNode);
                            if (!$assertionsDisabled && !this.exitWorker) {
                                throw new AssertionError();
                            }
                        }
                    }
                }
                taskLoop();
                LOG.info("SplitLogWorker " + this.serverName + " exiting");
            } catch (Throwable th) {
                LOG.error("unexpected error ", th);
                LOG.info("SplitLogWorker " + this.serverName + " exiting");
            }
        } catch (Throwable th2) {
            LOG.info("SplitLogWorker " + this.serverName + " exiting");
            throw th2;
        }
    }

    private void taskLoop() {
        while (true) {
            int i = this.taskReadySeq;
            List<String> taskList = getTaskList();
            if (taskList == null) {
                LOG.warn("Could not get tasks, did someone remove " + this.watcher.splitLogZNode + " ... worker thread exiting.");
                return;
            }
            int random = (int) (Math.random() * taskList.size());
            for (int i2 = 0; i2 < taskList.size(); i2++) {
                grabTask(ZKUtil.joinZNode(this.watcher.splitLogZNode, taskList.get((i2 + random) % taskList.size())));
                if (this.exitWorker) {
                    return;
                }
            }
            synchronized (this.taskReadyLock) {
                while (i == this.taskReadySeq) {
                    try {
                        this.taskReadyLock.wait();
                    } catch (InterruptedException e) {
                        LOG.info("SplitLogWorker interrupted while waiting for task, exiting: " + e.toString());
                        if (!$assertionsDisabled && !this.exitWorker) {
                            throw new AssertionError();
                        }
                        return;
                    }
                }
            }
        }
    }

    private void grabTask(String str) {
        Stat stat = new Stat();
        synchronized (this.grabTaskLock) {
            this.currentTask = str;
            this.workerInGrabTask = true;
            if (Thread.interrupted()) {
                return;
            }
            try {
                try {
                    byte[] dataNoWatch = ZKUtil.getDataNoWatch(this.watcher, str, stat);
                    if (dataNoWatch == null) {
                        ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
                        if (-1 > 0) {
                            LOG.info("worker " + this.serverName + " done with task " + str + " in " + (System.currentTimeMillis() - (-1)) + "ms");
                        }
                        synchronized (this.grabTaskLock) {
                            this.workerInGrabTask = false;
                            Thread.interrupted();
                        }
                        return;
                    }
                    if (!ZKSplitLog.TaskState.TASK_UNASSIGNED.equals(dataNoWatch)) {
                        ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
                        if (-1 > 0) {
                            LOG.info("worker " + this.serverName + " done with task " + str + " in " + (System.currentTimeMillis() - (-1)) + "ms");
                        }
                        synchronized (this.grabTaskLock) {
                            this.workerInGrabTask = false;
                            Thread.interrupted();
                        }
                        return;
                    }
                    this.currentVersion = stat.getVersion();
                    if (!attemptToOwnTask(true)) {
                        ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
                        if (-1 > 0) {
                            LOG.info("worker " + this.serverName + " done with task " + str + " in " + (System.currentTimeMillis() - (-1)) + "ms");
                        }
                        synchronized (this.grabTaskLock) {
                            this.workerInGrabTask = false;
                            Thread.interrupted();
                        }
                        return;
                    }
                    if (ZKSplitLog.isRescanNode(this.watcher, this.currentTask)) {
                        endTask(ZKSplitLog.TaskState.TASK_DONE, ZKSplitLog.Counters.tot_wkr_task_acquired_rescan);
                        if (-1 > 0) {
                            LOG.info("worker " + this.serverName + " done with task " + str + " in " + (System.currentTimeMillis() - (-1)) + "ms");
                        }
                        synchronized (this.grabTaskLock) {
                            this.workerInGrabTask = false;
                            Thread.interrupted();
                        }
                        return;
                    }
                    LOG.info("worker " + this.serverName + " acquired task " + str);
                    ZKSplitLog.Counters.tot_wkr_task_acquired.incrementAndGet();
                    getDataSetWatchAsync();
                    long currentTimeMillis = System.currentTimeMillis();
                    switch (this.splitTaskExecutor.exec(ZKSplitLog.getFileName(this.currentTask), new CancelableProgressable() { // from class: org.apache.hadoop.hbase.regionserver.SplitLogWorker.2
                        @Override // org.apache.hadoop.hbase.util.CancelableProgressable
                        public boolean progress() {
                            if (SplitLogWorker.this.attemptToOwnTask(false)) {
                                return true;
                            }
                            SplitLogWorker.LOG.warn("Failed to heartbeat the task" + SplitLogWorker.this.currentTask);
                            return false;
                        }
                    })) {
                        case DONE:
                            endTask(ZKSplitLog.TaskState.TASK_DONE, ZKSplitLog.Counters.tot_wkr_task_done);
                            break;
                        case PREEMPTED:
                            ZKSplitLog.Counters.tot_wkr_preempt_task.incrementAndGet();
                            LOG.warn("task execution prempted " + str);
                            break;
                        case ERR:
                            if (!this.exitWorker) {
                                endTask(ZKSplitLog.TaskState.TASK_ERR, ZKSplitLog.Counters.tot_wkr_task_err);
                                break;
                            }
                        case RESIGNED:
                            if (!this.exitWorker) {
                                ZKSplitLog.Counters.tot_wkr_preempt_task.incrementAndGet();
                                LOG.info("task execution interrupted via zk by manager " + str);
                                break;
                            } else {
                                LOG.info("task execution interrupted because worker is exiting " + str);
                                endTask(ZKSplitLog.TaskState.TASK_RESIGNED, ZKSplitLog.Counters.tot_wkr_task_resigned);
                                break;
                            }
                    }
                    if (currentTimeMillis > 0) {
                        LOG.info("worker " + this.serverName + " done with task " + str + " in " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
                    }
                    synchronized (this.grabTaskLock) {
                        this.workerInGrabTask = false;
                        Thread.interrupted();
                    }
                } catch (KeeperException e) {
                    LOG.warn("Failed to get data for znode " + str, e);
                    ZKSplitLog.Counters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
                    if (-1 > 0) {
                        LOG.info("worker " + this.serverName + " done with task " + str + " in " + (System.currentTimeMillis() - (-1)) + "ms");
                    }
                    synchronized (this.grabTaskLock) {
                        this.workerInGrabTask = false;
                        Thread.interrupted();
                    }
                }
            } catch (Throwable th) {
                if (-1 > 0) {
                    LOG.info("worker " + this.serverName + " done with task " + str + " in " + (System.currentTimeMillis() - (-1)) + "ms");
                }
                synchronized (this.grabTaskLock) {
                    this.workerInGrabTask = false;
                    Thread.interrupted();
                    throw th;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean attemptToOwnTask(boolean z) {
        try {
            Stat data = this.watcher.getRecoverableZooKeeper().setData(this.currentTask, ZKSplitLog.TaskState.TASK_OWNED.get(this.serverName), this.currentVersion);
            if (data == null) {
                LOG.warn("zk.setData() returned null for path " + this.currentTask);
                ZKSplitLog.Counters.tot_wkr_task_heartbeat_failed.incrementAndGet();
                return false;
            }
            this.currentVersion = data.getVersion();
            ZKSplitLog.Counters.tot_wkr_task_heartbeat.incrementAndGet();
            return true;
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while trying to assert ownership of " + this.currentTask + " " + StringUtils.stringifyException(e));
            Thread.currentThread().interrupt();
            ZKSplitLog.Counters.tot_wkr_task_heartbeat_failed.incrementAndGet();
            return false;
        } catch (KeeperException e2) {
            if (!z) {
                if (e2.code().equals(KeeperException.Code.NONODE)) {
                    LOG.warn("NONODE failed to assert ownership for " + this.currentTask, e2);
                } else if (e2.code().equals(KeeperException.Code.BADVERSION)) {
                    LOG.warn("BADVERSION failed to assert ownership for " + this.currentTask, e2);
                } else {
                    LOG.warn("failed to assert ownership for " + this.currentTask, e2);
                }
            }
            ZKSplitLog.Counters.tot_wkr_task_heartbeat_failed.incrementAndGet();
            return false;
        }
    }

    private void endTask(ZKSplitLog.TaskState taskState, AtomicLong atomicLong) {
        String str = this.currentTask;
        this.currentTask = null;
        try {
        } catch (KeeperException e) {
            LOG.warn("failed to end task, " + str + " " + taskState, e);
        } catch (KeeperException.NoNodeException e2) {
            LOG.fatal("logic error - end task " + str + " " + taskState + " failed because task doesn't exist", e2);
        } catch (KeeperException.BadVersionException e3) {
            LOG.warn("transisition task " + str + " to " + taskState + " failed because of version mismatch", e3);
        }
        if (ZKUtil.setData(this.watcher, str, taskState.get(this.serverName), this.currentVersion)) {
            LOG.info("successfully transitioned task " + str + " to final state " + taskState);
            atomicLong.incrementAndGet();
        } else {
            LOG.warn("failed to transistion task " + str + " to end state " + taskState + " because of version mismatch ");
            ZKSplitLog.Counters.tot_wkr_final_transistion_failed.incrementAndGet();
        }
    }

    void getDataSetWatchAsync() {
        this.watcher.getRecoverableZooKeeper().getZooKeeper().getData(this.currentTask, this.watcher, new GetDataAsyncCallback(), (Object) null);
        ZKSplitLog.Counters.tot_wkr_get_data_queued.incrementAndGet();
    }

    void getDataSetWatchSuccess(String str, byte[] bArr) {
        String str2;
        synchronized (this.grabTaskLock) {
            if (this.workerInGrabTask && (str2 = this.currentTask) != null && str2.equals(str) && !ZKSplitLog.TaskState.TASK_OWNED.equals(bArr, this.serverName) && !ZKSplitLog.TaskState.TASK_DONE.equals(bArr, this.serverName) && !ZKSplitLog.TaskState.TASK_ERR.equals(bArr, this.serverName) && !ZKSplitLog.TaskState.TASK_RESIGNED.equals(bArr, this.serverName)) {
                LOG.info("task " + str2 + " preempted from " + this.serverName + ", current task state and owner=" + new String(bArr));
                stopTask();
            }
        }
    }

    void getDataSetWatchFailure(String str) {
        String str2;
        synchronized (this.grabTaskLock) {
            if (this.workerInGrabTask && (str2 = this.currentTask) != null && str2.equals(str)) {
                LOG.info("retrying data watch on " + str);
                ZKSplitLog.Counters.tot_wkr_get_data_retry.incrementAndGet();
                getDataSetWatchAsync();
            }
        }
    }

    @Override // org.apache.hadoop.hbase.zookeeper.ZooKeeperListener
    public void nodeDataChanged(String str) {
        String str2;
        synchronized (this.grabTaskLock) {
            if (this.workerInGrabTask && (str2 = this.currentTask) != null && str2.equals(str)) {
                getDataSetWatchAsync();
            }
        }
    }

    private List<String> getTaskList() {
        List<String> list = null;
        while (!this.exitWorker) {
            try {
                list = ZKUtil.listChildrenAndWatchForNewChildren(this.watcher, this.watcher.splitLogZNode);
                if (list != null) {
                    return list;
                }
            } catch (KeeperException e) {
                LOG.warn("Could not get children of znode " + this.watcher.splitLogZNode, e);
            }
            try {
                LOG.debug("Retry listChildren of znode " + this.watcher.splitLogZNode + " after sleep for 1000ms!");
                Thread.sleep(1000L);
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted while trying to get task list ...", e2);
                Thread.currentThread().interrupt();
            }
        }
        return list;
    }

    @Override // org.apache.hadoop.hbase.zookeeper.ZooKeeperListener
    public void nodeChildrenChanged(String str) {
        if (str.equals(this.watcher.splitLogZNode)) {
            LOG.debug("tasks arrived or departed");
            synchronized (this.taskReadyLock) {
                this.taskReadySeq++;
                this.taskReadyLock.notify();
            }
        }
    }

    void stopTask() {
        LOG.info("Sending interrupt to stop the worker thread");
        this.worker.interrupt();
    }

    public void start() {
        this.worker = new Thread(null, this, "SplitLogWorker-" + this.serverName);
        this.exitWorker = false;
        this.worker.start();
    }

    public void stop() {
        this.exitWorker = true;
        stopTask();
    }

    static {
        $assertionsDisabled = !SplitLogWorker.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(SplitLogWorker.class);
    }
}
