/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.txn.compactor;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStatus;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsUtils;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorMR;
import org.apache.hadoop.hive.ql.txn.compactor.QueryCompactor;
import org.apache.hadoop.hive.ql.txn.compactor.RemoteCompactorThread;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker
extends RemoteCompactorThread
implements MetaStoreThread {
    private static final String CLASS_NAME = Worker.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger((String)CLASS_NAME);
    private static final long SLEEP_TIME = 10000L;
    private String workerName;

    public static String hostname() {
        try {
            return InetAddress.getLocalHost().getHostName();
        }
        catch (UnknownHostException e) {
            LOG.error("Unable to resolve my host name " + e.getMessage());
            throw new RuntimeException(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOG.info("Starting Worker thread");
        boolean genericStats = this.conf.getBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS);
        boolean mrStats = this.conf.getBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS);
        long timeout = this.conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS);
        ExecutorService executor = this.getTimeoutHandlingExecutor();
        try {
            do {
                boolean launchedJob;
                Future<Boolean> singleRun = executor.submit(() -> this.findNextCompactionAndExecute(genericStats, mrStats));
                try {
                    launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
                }
                catch (TimeoutException te) {
                    LOG.info("Timeout during executing compaction", (Throwable)te);
                    singleRun.cancel(true);
                    executor.shutdownNow();
                    executor = this.getTimeoutHandlingExecutor();
                    launchedJob = true;
                }
                catch (ExecutionException e) {
                    LOG.info("Exception during executing compaction", (Throwable)e);
                    launchedJob = true;
                }
                catch (InterruptedException ie) {
                    launchedJob = true;
                }
                if (!launchedJob && !this.stop.get()) {
                    try {
                        Thread.sleep(10000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                LOG.info("Worker thread finished one loop.");
            } while (!this.stop.get());
        }
        finally {
            if (executor != null) {
                executor.shutdownNow();
            }
            if (this.msc != null) {
                this.msc.close();
            }
        }
    }

    @VisibleForTesting
    public void verifyTableIdHasNotChanged(CompactionInfo ci, Table originalTable) throws HiveException, MetaException {
        Table currentTable = this.resolveTable(ci);
        if (originalTable.getId() != currentTable.getId()) {
            throw new HiveException("Table " + originalTable.getDbName() + "." + originalTable.getTableName() + " id (" + currentTable.getId() + ") is not equal to its id when compaction started (" + originalTable.getId() + "). The table might have been dropped and recreated while compaction was running. Marking compaction as failed.");
        }
    }

    @Override
    public void init(AtomicBoolean stop) throws Exception {
        super.init(stop);
        this.workerName = this.getWorkerId();
        this.setName(this.workerName);
    }

    static boolean isEnoughToCompact(boolean isMajorCompaction, AcidDirectory dir, StorageDescriptor sd) {
        boolean isEnoughToCompact;
        int deltaCount = dir.getCurrentDirectories().size();
        int origCount = dir.getOriginalFiles().size();
        StringBuilder deltaInfo = new StringBuilder().append(deltaCount);
        if (isMajorCompaction) {
            isEnoughToCompact = origCount > 0 || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1;
        } else {
            boolean bl = isEnoughToCompact = deltaCount > 1;
            if (deltaCount == 2) {
                Map<String, Long> deltaByType = dir.getCurrentDirectories().stream().collect(Collectors.groupingBy(delta -> delta.isDeleteDelta() ? "delete_delta_" : "delta_", Collectors.counting()));
                isEnoughToCompact = deltaByType.size() != deltaCount;
                deltaInfo.append(" ").append(deltaByType);
            }
        }
        if (!isEnoughToCompact) {
            LOG.info("Not enough files in {} to compact; current base: {}, delta files: {}, originals: {}", new Object[]{sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount});
        }
        return isEnoughToCompact;
    }

    public static boolean needsCleaning(AcidDirectory dir, StorageDescriptor sd) {
        boolean needsJustCleaning;
        int numObsoleteDirs = dir.getObsolete().size();
        boolean bl = needsJustCleaning = numObsoleteDirs > 0;
        if (needsJustCleaning) {
            LOG.info("{} obsolete directories in {} found; marked for cleaning.", (Object)numObsoleteDirs, (Object)sd.getLocation());
        }
        return needsJustCleaning;
    }

    private ExecutorService getTimeoutHandlingExecutor() {
        return Executors.newSingleThreadExecutor(r -> {
            Thread masterThread = Thread.currentThread();
            Thread t = new Thread(masterThread.getThreadGroup(), r, masterThread.getName() + "_executor");
            t.setDaemon(masterThread.isDaemon());
            t.setPriority(masterThread.getPriority());
            return t;
        });
    }

    /*
     * Exception decompiling
     */
    @VisibleForTesting
    protected Boolean findNextCompactionAndExecute(boolean collectGenericStats, boolean collectMrStats) {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Tried to end blocks [13[TRYBLOCK]], but top level block is 42[CATCHBLOCK]
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.processEndingBlocks(Op04StructuredStatement.java:435)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:484)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    private void cleanupResultDirs(StorageDescriptor sd, ValidWriteIdList writeIds, CompactionType ctype, AcidDirectory dir) {
        Path resultDir = QueryCompactor.Util.getCompactionResultDir(sd, writeIds, this.conf, ctype == CompactionType.MAJOR, false, false, dir);
        LOG.info("Deleting result directories created by the compactor:\n");
        try {
            FileSystem fs = resultDir.getFileSystem((Configuration)this.conf);
            LOG.info(resultDir.toString());
            fs.delete(resultDir, true);
            if (ctype == CompactionType.MINOR) {
                Path deleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(sd, writeIds, this.conf, false, true, false, dir);
                LOG.info(deleteDeltaDir.toString());
                fs.delete(deleteDeltaDir, true);
            }
        }
        catch (IOException ex) {
            LOG.error("Caught exception while cleaning result directories:", (Throwable)ex);
        }
    }

    private void failCompactionIfSetForTest() {
        if (this.conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && this.conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
            throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
        }
    }

    private void runCompactionViaMrJob(CompactionInfo ci, Table t, Partition p, StorageDescriptor sd, ValidCompactorWriteIdList tblValidWriteIds, StringBuilder jobName, AcidDirectory dir) throws IOException, InterruptedException {
        CompactorMR mr = new CompactorMR();
        if (this.runJobAsSelf(ci.runAs)) {
            mr.run(this.conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, this.msc, dir);
        } else {
            UserGroupInformation ugi = UserGroupInformation.createProxyUser((String)ci.runAs, (UserGroupInformation)UserGroupInformation.getLoginUser());
            ugi.doAs(() -> {
                mr.run(this.conf, jobName.toString(), t, p, sd, tblValidWriteIds, ci, this.msc, dir);
                return null;
            });
            try {
                FileSystem.closeAllForUGI((UserGroupInformation)ugi);
            }
            catch (IOException exception) {
                LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + ci.getFullPartitionName(), (Throwable)exception);
            }
        }
    }

    private void markFailed(CompactionInfo ci, Throwable e) {
        if (ci == null) {
            LOG.warn("CompactionInfo client was null. Could not mark failed");
            return;
        }
        ci.errorMessage = e.getMessage();
        if (this.msc == null) {
            LOG.warn("Metastore client was null. Could not mark failed: {}", (Object)ci);
            return;
        }
        try {
            this.msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
        }
        catch (Throwable t) {
            LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", (Object)ci, (Object)t);
        }
    }

    private void checkInterrupt() throws InterruptedException {
        if (Thread.interrupted()) {
            throw new InterruptedException("Compaction execution is interrupted");
        }
    }

    private String getWorkerId() {
        StringBuilder name = new StringBuilder(this.hostName);
        name.append("-");
        name.append(this.getId());
        return name.toString();
    }

    private /* synthetic */ Void lambda$findNextCompactionAndExecute$3(StorageDescriptor sd, ValidCompactorWriteIdList tblValidWriteIds, CompactionType ctype, AcidDirectory dir) throws Exception {
        this.cleanupResultDirs(sd, tblValidWriteIds, ctype, dir);
        return null;
    }

    private class CompactionTxn
    implements AutoCloseable {
        private long txnId = 0L;
        private TxnStatus status = TxnStatus.UNKNOWN;
        private boolean succeessfulCompaction = false;
        private ScheduledExecutorService heartbeatExecutor;

        private CompactionTxn() {
        }

        void open(CompactionInfo ci) throws TException {
            this.txnId = Worker.this.msc.openTxn(ci.runAs, TxnType.COMPACTION);
            this.status = TxnStatus.OPEN;
            this.heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setPriority(1).setDaemon(true).setNameFormat("CompactionTxnHeartbeater-" + this.txnId).build());
            long txnTimeout = MetastoreConf.getTimeVar(Worker.this.conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
            this.heartbeatExecutor.scheduleAtFixedRate(new CompactionHeartbeater(this, TxnUtils.getFullTableName(ci.dbname, ci.tableName), Worker.this.conf), txnTimeout / 4L, txnTimeout / 2L, TimeUnit.MILLISECONDS);
        }

        void wasSuccessful() {
            this.succeessfulCompaction = true;
        }

        @Override
        public void close() throws Exception {
            this.shutdownHeartbeater();
            if (this.status != TxnStatus.UNKNOWN) {
                if (this.succeessfulCompaction) {
                    this.commit();
                } else {
                    this.abort();
                }
            }
        }

        private void shutdownHeartbeater() {
            if (this.heartbeatExecutor != null) {
                this.heartbeatExecutor.shutdownNow();
                try {
                    if (!this.heartbeatExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                        LOG.warn("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", (Object)this);
                    }
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        long getTxnId() {
            return this.txnId;
        }

        public String toString() {
            return "txnId=" + this.txnId + " (TxnStatus: " + (Object)((Object)this.status) + ")";
        }

        private void commit() throws TException {
            if (this.status == TxnStatus.OPEN) {
                Worker.this.msc.commitTxn(this.txnId);
                this.status = TxnStatus.COMMITTED;
            }
        }

        private void abort() throws TException {
            if (this.status == TxnStatus.OPEN) {
                Worker.this.msc.abortTxns(Collections.singletonList(this.txnId));
                this.status = TxnStatus.ABORTED;
            }
        }
    }

    static final class CompactionHeartbeater
    implements Runnable {
        private static final Logger LOG = LoggerFactory.getLogger(CompactionHeartbeater.class);
        private final CompactionTxn compactionTxn;
        private final String tableName;
        private final HiveConf conf;

        public CompactionHeartbeater(CompactionTxn compactionTxn, String tableName, HiveConf conf) {
            this.tableName = Objects.requireNonNull(tableName);
            this.compactionTxn = Objects.requireNonNull(compactionTxn);
            this.conf = Objects.requireNonNull(conf);
        }

        @Override
        public void run() {
            LOG.debug("Heartbeating compaction transaction id {} for table: {}", (Object)this.compactionTxn, (Object)this.tableName);
            try (IMetaStoreClient msc = null;){
                msc = HiveMetaStoreUtils.getHiveMetastoreClient(this.conf);
                msc.heartbeat(this.compactionTxn.getTxnId(), 0L);
            }
        }
    }

    @VisibleForTesting
    @ThreadSafe
    static final class StatsUpdater {
        private static final Logger LOG = LoggerFactory.getLogger(StatsUpdater.class);

        StatsUpdater() {
        }

        static void gatherStats(CompactionInfo ci, HiveConf conf, String userName, String compactionQueueName) {
            try {
                if (!ci.isMajorCompaction()) {
                    return;
                }
                HiveConf statusUpdaterConf = new HiveConf(conf);
                statusUpdaterConf.unset("hive.txn.valid.txns");
                StringBuilder sb = new StringBuilder("analyze table ").append(StatsUtils.getFullyQualifiedTableName(ci.dbname, ci.tableName));
                if (ci.partName != null) {
                    sb.append(" partition(");
                    Map<String, String> partitionColumnValues = Warehouse.makeEscSpecFromName(ci.partName);
                    for (Map.Entry<String, String> ent : partitionColumnValues.entrySet()) {
                        sb.append(ent.getKey()).append("='").append(ent.getValue()).append("',");
                    }
                    sb.setLength(sb.length() - 1);
                    sb.append(")");
                }
                sb.append(" compute statistics");
                LOG.info(ci + ": running '" + sb + "'");
                if (compactionQueueName != null && compactionQueueName.length() > 0) {
                    statusUpdaterConf.set("tez.queue.name", compactionQueueName);
                }
                SessionState sessionState = DriverUtils.setUpSessionState(statusUpdaterConf, userName, true);
                DriverUtils.runOnDriver(statusUpdaterConf, userName, sessionState, sb.toString());
            }
            catch (Throwable t) {
                LOG.error(ci + ": gatherStats(" + ci.dbname + "," + ci.tableName + "," + ci.partName + ") failed due to: " + t.getMessage(), t);
            }
        }
    }
}

