/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.txn.compactor;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorMR;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactorFactory;
import org.apache.hadoop.hive.ql.txn.compactor.RemoteCompactorThread;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker
extends RemoteCompactorThread
implements MetaStoreThread {
    private static final String CLASS_NAME = Worker.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger((String)CLASS_NAME);
    private static final long SLEEP_TIME = 10000L;
    private static final int TXN_ID_NOT_SET = -1;
    private String workerName;

    public static String hostname() {
        try {
            return InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            LOG.error("Unable to resolve my host name " + e.getMessage());
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOG.info("Starting Worker thread");
        boolean genericStats = this.conf.getBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS);
        boolean mrStats = this.conf.getBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS);
        long timeout = this.conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS);
        ExecutorService executor = this.getTimeoutHandlingExecutor();
        try {
            do {
                boolean launchedJob;
                Future<Boolean> singleRun = executor.submit(() -> this.findNextCompactionAndExecute(genericStats, mrStats));
                try {
                    launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException te) {
                    LOG.info("Timeout during executing compaction", (Throwable)te);
                    singleRun.cancel(true);
                    executor.shutdownNow();
                    executor = this.getTimeoutHandlingExecutor();
                    launchedJob = true;
                }
                catch (ExecutionException e) {
                    LOG.info("Exception during executing compaction", (Throwable)e);
                    launchedJob = true;
                }
                catch (InterruptedException ie) {
                    launchedJob = true;
                }
                if (!launchedJob && !this.stop.get()) {
                    try {
                        Thread.sleep(10000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                LOG.info("Worker thread finished one loop.");
            } while (!this.stop.get());
        }
        finally {
            if (executor != null) {
                executor.shutdownNow();
            }
            if (this.msc != null) {
                this.msc.close();
            }
        }
    }

    @VisibleForTesting
    public void verifyTableIdHasNotChanged(CompactionInfo ci, Table originalTable) throws HiveException, MetaException {
        Table currentTable = this.resolveTable(ci);
        if (originalTable.getId() != currentTable.getId()) {
            throw new HiveException("Table " + originalTable.getDbName() + "." + originalTable.getTableName() + " id (" + currentTable.getId() + ") is not equal to its id when compaction started (" + originalTable.getId() + "). The table might have been dropped and recreated while compaction was running. Marking compaction as failed.");
        }
    }

    private void commitTxnIfSet(long compactorTxnId) {
        if (compactorTxnId != -1L) {
            try {
                if (this.msc != null) {
                    this.msc.commitTxn(compactorTxnId);
                }
            }
            catch (TException e) {
                LOG.error("Caught an exception while committing compaction in worker " + this.workerName + ", " + StringUtils.stringifyException((Throwable)e));
            }
        }
    }

    @Override
    public void init(AtomicBoolean stop) throws Exception {
        super.init(stop);
        StringBuilder name = new StringBuilder(Worker.hostname());
        name.append("-");
        name.append(this.getId());
        this.workerName = name.toString();
        this.setName(name.toString());
    }

    static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) {
        boolean isEnoughToCompact;
        int deltaCount = dir.getCurrentDirectories().size();
        int origCount = dir.getOriginalFiles().size();
        StringBuilder deltaInfo = new StringBuilder().append(deltaCount);
        if (isMajorCompaction) {
            isEnoughToCompact = origCount > 0 || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1;
        } else {
            boolean bl = isEnoughToCompact = deltaCount > 1;
            if (deltaCount == 2) {
                Map<String, Long> deltaByType = dir.getCurrentDirectories().stream().collect(Collectors.groupingBy(delta -> delta.isDeleteDelta() ? "delete_delta_" : "delta_", Collectors.counting()));
                isEnoughToCompact = deltaByType.size() != deltaCount;
                deltaInfo.append(" ").append(deltaByType);
            }
        }
        if (!isEnoughToCompact) {
            LOG.info("Not enough files in {} to compact; current base: {}, delta files: {}, originals: {}", new Object[]{sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount});
        }
        return isEnoughToCompact;
    }

    public static boolean needsCleaning(AcidUtils.Directory dir, StorageDescriptor sd) {
        boolean needsJustCleaning;
        int numObsoleteDirs = dir.getObsolete().size();
        boolean bl = needsJustCleaning = numObsoleteDirs > 0;
        if (needsJustCleaning) {
            LOG.info("{} obsolete directories in {} found; marked for cleaning.", (Object)numObsoleteDirs, (Object)sd.getLocation());
        }
        return needsJustCleaning;
    }

    private ExecutorService getTimeoutHandlingExecutor() {
        return Executors.newSingleThreadExecutor(r -> {
            Thread masterThread = Thread.currentThread();
            Thread t = new Thread(masterThread.getThreadGroup(), r, masterThread.getName() + "_executor");
            t.setDaemon(masterThread.isDaemon());
            t.setPriority(masterThread.getPriority());
            return t;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @VisibleForTesting
    protected Boolean findNextCompactionAndExecute(boolean collectGenericStats, boolean collectMrStats) {
        Table t1;
        boolean computeStats;
        CompactionInfo ci;
        block50: {
            CompactionHeartbeater heartbeater = null;
            long compactorTxnId = -1L;
            ci = null;
            computeStats = false;
            t1 = null;
            try {
                if (this.msc == null) {
                    try {
                        this.msc = HiveMetaStoreUtils.getHiveMetastoreClient(this.conf);
                    }
                    catch (Exception e) {
                        LOG.error("Failed to connect to HMS", (Throwable)e);
                        Boolean bl = false;
                        this.commitTxnIfSet(compactorTxnId);
                        if (heartbeater != null) {
                            heartbeater.cancel();
                        }
                        return bl;
                    }
                }
                ci = CompactionInfo.optionalCompactionInfoStructToInfo(this.msc.findNextCompact(this.workerName));
                LOG.debug("Processing compaction request " + ci);
                if (ci == null && !this.stop.get()) {
                    try {
                        Thread.sleep(10000L);
                        Boolean e = false;
                        return e;
                    }
                    catch (InterruptedException e) {
                        LOG.warn("Worker thread sleep interrupted " + e.getMessage());
                        Boolean bl = false;
                        this.commitTxnIfSet(compactorTxnId);
                        if (heartbeater != null) {
                            heartbeater.cancel();
                        }
                        return bl;
                    }
                }
                this.checkInterrupt();
                try {
                    t1 = this.resolveTable(ci);
                    if (t1 == null) {
                        LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped and moving on.");
                        this.msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
                        Boolean e = false;
                        return e;
                    }
                }
                catch (MetaException e) {
                    this.msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
                    Boolean bl = false;
                    return bl;
                }
                this.checkInterrupt();
                Table t = t1;
                String fullTableName = TxnUtils.getFullTableName(t.getDbName(), t.getTableName());
                Partition p = null;
                try {
                    p = this.resolvePartition(ci);
                    if (p == null && ci.partName != null) {
                        LOG.info("Unable to find partition " + ci.getFullPartitionName() + ", assuming it was dropped and moving on.");
                        this.msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
                        Boolean bl = false;
                        return bl;
                    }
                }
                catch (Exception e) {
                    this.msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
                    Boolean bl = false;
                    return bl;
                }
                this.checkInterrupt();
                StorageDescriptor sd = this.resolveStorageDescriptor(t, p);
                if (sd.getSortCols() != null && !sd.getSortCols().isEmpty()) {
                    LOG.error("Attempt to compact sorted table, which is not yet supported!");
                    this.msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
                    Boolean bl = false;
                    return bl;
                }
                if (ci.runAs == null) {
                    ci.runAs = this.findUserToRunAs(sd.getLocation(), t);
                }
                this.checkInterrupt();
                compactorTxnId = this.msc.openTxn(ci.runAs, TxnType.COMPACTION);
                heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, this.conf);
                heartbeater.start();
                ValidTxnList validTxnList = this.msc.getValidTxns(compactorTxnId);
                ValidCompactorWriteIdList tblValidWriteIds = TxnUtils.createValidCompactWriteIdList(this.msc.getValidWriteIds(Collections.singletonList(fullTableName), validTxnList.writeToString()).get(0));
                LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
                this.conf.set("hive.txn.valid.txns", validTxnList.writeToString());
                ci.highestWriteId = tblValidWriteIds.getHighWatermark();
                this.msc.updateCompactorState(CompactionInfo.compactionInfoToStruct(ci), compactorTxnId);
                this.checkInterrupt();
                StringBuilder jobName = new StringBuilder(this.workerName);
                jobName.append("-compactor-");
                jobName.append(ci.getFullPartitionName());
                AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), this.conf, tblValidWriteIds, Ref.from(false), true, null, false);
                if (!Worker.isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
                    if (Worker.needsCleaning(dir, sd)) {
                        this.msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
                    } else {
                        this.msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
                    }
                    Boolean bl = false;
                    return bl;
                }
                this.checkInterrupt();
                try {
                    this.failCompactionIfSetForTest();
                    QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, this.conf, ci);
                    computeStats = queryCompactor == null && collectMrStats || collectGenericStats;
                    LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + ", id:" + ci.id + " in " + JavaUtils.txnIdToString(compactorTxnId) + " with compute stats set to " + computeStats);
                    if (queryCompactor != null) {
                        LOG.info("Will compact id: " + ci.id + " with query-based compactor class: " + queryCompactor.getClass().getName());
                        queryCompactor.runCompaction(this.conf, t, p, sd, tblValidWriteIds, ci, dir);
                    } else {
                        LOG.info("Will compact id: " + ci.id + " via MR job");
                        this.runCompactionViaMrJob(ci, t, p, sd, tblValidWriteIds, jobName, dir);
                    }
                    heartbeater.cancel();
                    this.verifyTableIdHasNotChanged(ci, t1);
                    LOG.info("Completed " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in txn " + JavaUtils.txnIdToString(compactorTxnId) + ", marking as compacted.");
                    this.msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
                }
                catch (Exception e) {
                    LOG.error("Caught exception while trying to compact " + ci + ". Marking failed to avoid repeated failures", (Throwable)e);
                    CompactionType ctype = ci.type;
                    this.abortCompactionAndMarkFailed(ci, compactorTxnId, e);
                    if (this.runJobAsSelf(ci.runAs)) {
                        this.cleanupResultDirs(sd, tblValidWriteIds, ctype, dir);
                        break block50;
                    }
                    LOG.info("Cleaning as user " + ci.runAs);
                    UserGroupInformation ugi = UserGroupInformation.createProxyUser((String)ci.runAs, (UserGroupInformation)UserGroupInformation.getLoginUser());
                    ugi.doAs(() -> {
                        this.cleanupResultDirs(sd, tblValidWriteIds, ctype, dir);
                        return null;
                    });
                    try {
                        FileSystem.closeAllForUGI((UserGroupInformation)ugi);
                        break block50;
                    }
                    catch (IOException ex) {
                        LOG.error("Could not clean up file-system handles for UGI: " + ugi, (Throwable)e);
                    }
                }
                break block50;
                {
                    catch (IOException | TException t2) {
                        LOG.error("Caught an exception in the main loop of compactor worker " + this.workerName + ", " + StringUtils.stringifyException((Throwable)t2));
                        try {
                            this.abortCompactionAndMarkFailed(ci, compactorTxnId, t2);
                            break block50;
                        }
                        catch (TException e) {
                            LOG.error("Caught an exception while trying to mark compaction {} as failed: {}" + (compactorTxnId != -1L ? " or abort txnId " + compactorTxnId : ""), (Object)ci, (Object)e);
                            break block50;
                        }
                        finally {
                            if (this.msc != null) {
                                this.msc.close();
                                this.msc = null;
                            }
                        }
                    }
                    catch (Throwable t3) {
                        LOG.error("Caught an exception in the main loop of compactor worker " + this.workerName + ", " + StringUtils.stringifyException((Throwable)t3));
                        break block50;
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                }
            }
            finally {
                this.commitTxnIfSet(compactorTxnId);
                if (heartbeater != null) {
                    heartbeater.cancel();
                }
            }
        }
        if (computeStats) {
            StatsUpdater.gatherStats(ci, this.conf, this.runJobAsSelf(ci.runAs) ? ci.runAs : t1.getOwner(), CompactorUtil.getCompactorJobQueueName(this.conf, ci, t1));
        }
        return true;
    }

    private void cleanupResultDirs(StorageDescriptor sd, ValidWriteIdList writeIds, CompactionType ctype, AcidUtils.Directory dir) {
        Path resultDir = QueryCompactor.Util.getCompactionResultDir(sd, writeIds, this.conf, ctype == CompactionType.MAJOR, false, false, dir);
        LOG.info("Deleting result directories created by the compactor:\n");
        try {
            FileSystem fs = resultDir.getFileSystem((Configuration)this.conf);
            LOG.info(resultDir.toString());
            fs.delete(resultDir, true);
            if (ctype == CompactionType.MINOR) {
                Path deleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(sd, writeIds, this.conf, false, true, false, dir);
                LOG.info(deleteDeltaDir.toString());
                fs.delete(deleteDeltaDir, true);
            }
        }
        catch (IOException ex) {
            LOG.error("Caught exception while cleaning result directories:", (Throwable)ex);
        }
    }

    private void failCompactionIfSetForTest() {
        if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && this.conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
            throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
        }
    }

    private void runCompactionViaMrJob(CompactionInfo ci, Table t, Partition p, StorageDescriptor sd, ValidCompactorWriteIdList tblValidWriteIds, StringBuilder jobName, AcidUtils.Directory dir) throws IOException, InterruptedException {
        CompactorMR mr = new CompactorMR();
        if (this.runJobAsSelf(ci.runAs)) {
            mr.run(this.conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, this.msc, dir);
        } else {
            UserGroupInformation ugi = UserGroupInformation.createProxyUser((String)ci.runAs, (UserGroupInformation)UserGroupInformation.getLoginUser());
            ugi.doAs(() -> {
                mr.run(this.conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, this.msc, dir);
                return null;
            });
            try {
                FileSystem.closeAllForUGI((UserGroupInformation)ugi);
            }
            catch (IOException exception) {
                LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + ci.getFullPartitionName(), (Throwable)exception);
            }
        }
    }

    private void abortCompactionAndMarkFailed(CompactionInfo ci, long compactorTxnId, Throwable e) throws TException {
        if (ci != null) {
            ci.errorMessage = e.getMessage();
        }
        if (this.msc != null) {
            this.msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
            if (compactorTxnId != -1L) {
                this.msc.abortTxns(Collections.singletonList(compactorTxnId));
            }
        }
    }

    private void checkInterrupt() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException("Compaction execution is interrupted");
        }
    }

    static final class CompactionHeartbeater
    extends Thread {
        private static final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class);
        private final AtomicBoolean stop = new AtomicBoolean();
        private final long compactorTxnId;
        private final String tableName;
        private final HiveConf conf;
        private final long interval;

        public CompactionHeartbeater(long compactorTxnId, String tableName, HiveConf conf) {
            this.tableName = tableName;
            this.compactorTxnId = compactorTxnId;
            this.conf = conf;
            this.interval = MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) / 2L;
            this.setDaemon(true);
            this.setPriority(1);
            this.setName("CompactionHeartbeater-" + compactorTxnId);
        }

        @Override
        public void run() {
            try (IMetaStoreClient msc = null;){
                msc = HiveMetaStoreUtils.getHiveMetastoreClient(this.conf);
                LOG.debug("Heartbeating compaction transaction id {} for table: {}", (Object)this.compactorTxnId, (Object)this.tableName);
                while (!this.stop.get()) {
                    msc.heartbeat(this.compactorTxnId, 0L);
                    Thread.sleep(this.interval);
                }
            }
        }

        public void cancel() {
            if (!this.stop.get()) {
                LOG.debug("Successfully stop the heartbeating the transaction {}", (Object)this.compactorTxnId);
                this.stop.set(true);
            }
        }
    }

    static final class StatsUpdater {
        private static final Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);

        StatsUpdater() {
        }

        static void gatherStats(CompactionInfo ci, HiveConf conf, String userName, String compactionQueueName) {
            try {
                if (!ci.isMajorCompaction()) {
                    return;
                }
                HiveConf statusUpdaterConf = new HiveConf(conf);
                statusUpdaterConf.unset("hive.txn.valid.txns");
                StringBuilder sb = new StringBuilder("analyze table ").append(StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName));
                if (ci.partName != null) {
                    sb.append(" partition(");
                    Map<String, String> partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName);
                    for (Map.Entry<String, String> ent : partitionColumnValues.entrySet()) {
                        sb.append(ent.getKey()).append("='").append(ent.getValue()).append("',");
                    }
                    sb.setLength(sb.length() - 1);
                    sb.append(")");
                }
                sb.append(" compute statistics");
                LOG.info(ci + ": running '" + sb + "'");
                if (compactionQueueName != null && compactionQueueName.length() > 0) {
                    statusUpdaterConf.set("tez.queue.name", compactionQueueName);
                }
                SessionState sessionState = DriverUtils.setUpSessionState(statusUpdaterConf, userName, true);
                DriverUtils.runOnDriver(statusUpdaterConf, userName, sessionState, sb.toString());
            }
            catch (Throwable t) {
                LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName + ") failed due to: " + t.getMessage(), t);
            }
        }
    }
}

