/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.txn.compactor;

import com.codahale.metrics.Counter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.MetaStoreCompactorThread;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Cleaner
extends MetaStoreCompactorThread {
    private static final String CLASS_NAME = Cleaner.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger((String)CLASS_NAME);
    private long cleanerCheckInterval = 0L;
    private ReplChangeManager replChangeManager;
    private ExecutorService cleanerExecutor;

    @Override
    public void init(AtomicBoolean stop) throws Exception {
        super.init(stop);
        this.replChangeManager = ReplChangeManager.getInstance(this.conf);
        this.cleanerCheckInterval = this.conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS);
        this.cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(this.conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM), "Cleaner-executor-thread-%d");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            boolean metricsEnabled = MetastoreConf.getBoolVar(this.conf, MetastoreConf.ConfVars.METRICS_ENABLED) && MetastoreConf.getBoolVar(this.conf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON);
            Counter failuresCounter = Metrics.getOrCreateCounter("compaction_cleaner_failure_counter");
            do {
                TxnStore.MutexAPI.LockHandle handle = null;
                long startedAt = -1L;
                try {
                    handle = this.txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name());
                    startedAt = System.currentTimeMillis();
                    long minOpenTxnId = this.txnHandler.findMinOpenTxnId();
                    ArrayList<CompletableFuture<Void>> cleanerList = new ArrayList<CompletableFuture<Void>>();
                    for (CompactionInfo compactionInfo : this.txnHandler.findReadyToClean()) {
                        cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() -> this.clean(compactionInfo, minOpenTxnId, metricsEnabled)), this.cleanerExecutor));
                    }
                    CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
                }
                catch (Throwable t) {
                    if (metricsEnabled && handle != null) {
                        failuresCounter.inc();
                    }
                    LOG.error("Caught an exception in the main loop of compactor cleaner, " + StringUtils.stringifyException((Throwable)t));
                }
                finally {
                    if (handle != null) {
                        handle.releaseLocks();
                    }
                }
                long elapsedTime = System.currentTimeMillis() - startedAt;
                if (elapsedTime >= this.cleanerCheckInterval || this.stop.get()) continue;
                Thread.sleep(this.cleanerCheckInterval - elapsedTime);
            } while (!this.stop.get());
        }
        catch (InterruptedException ie) {
            LOG.error("Compactor cleaner thread interrupted, exiting " + StringUtils.stringifyException((Throwable)ie));
        }
        finally {
            if (this.cleanerExecutor != null) {
                this.cleanerExecutor.shutdownNow();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean metricsEnabled) throws MetaException {
        LOG.info("Starting cleaning for " + ci);
        PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
        String cleanerMetric = "compaction_cleaner_cycle_" + (ci.type != null ? ci.type.toString().toLowerCase() : null);
        try {
            Table t;
            if (metricsEnabled) {
                perfLogger.PerfLogBegin(CLASS_NAME, cleanerMetric);
            }
            if ((t = this.resolveTable(ci)) == null) {
                LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." + Cleaner.idWatermark(ci));
                this.txnHandler.markCleaned(ci);
                return;
            }
            Partition p = null;
            if (ci.partName != null && (p = this.resolvePartition(ci)) == null) {
                LOG.info("Unable to find partition " + ci.getFullPartitionName() + ", assuming it was dropped." + Cleaner.idWatermark(ci));
                this.txnHandler.markCleaned(ci);
                return;
            }
            StorageDescriptor sd = this.resolveStorageDescriptor(t, p);
            String location = sd.getLocation();
            ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner(this.txnHandler.getOpenTxns(), minOpenTxnGLB);
            this.conf.set("hive.txn.valid.txns", validTxnList.writeToString());
            ValidReaderWriteIdList validWriteIdList = this.getValidCleanerWriteIdList(ci, t, validTxnList);
            Ref<Boolean> removedFiles = Ref.from(false);
            if (this.runJobAsSelf(ci.runAs)) {
                removedFiles.value = this.removeFiles(location, validWriteIdList, ci);
            } else {
                LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName());
                UserGroupInformation ugi = UserGroupInformation.createProxyUser((String)ci.runAs, (UserGroupInformation)UserGroupInformation.getLoginUser());
                try {
                    ugi.doAs(() -> {
                        removedFiles.value = this.removeFiles(location, validWriteIdList, ci);
                        return null;
                    });
                }
                finally {
                    try {
                        FileSystem.closeAllForUGI((UserGroupInformation)ugi);
                    }
                    catch (IOException exception) {
                        LOG.error("Could not clean up file-system handles for UGI: " + ugi + " for " + ci.getFullPartitionName() + Cleaner.idWatermark(ci), (Throwable)exception);
                    }
                }
            }
            if (((Boolean)removedFiles.value).booleanValue()) {
                this.txnHandler.markCleaned(ci);
            } else {
                LOG.warn("No files were removed. Leaving queue entry " + ci + " in ready for cleaning state.");
            }
        }
        catch (Exception e) {
            LOG.error("Caught exception when cleaning, unable to complete cleaning of " + ci + " " + StringUtils.stringifyException((Throwable)e));
            ci.errorMessage = e.getMessage();
            this.txnHandler.markFailed(ci);
        }
        finally {
            if (metricsEnabled) {
                perfLogger.PerfLogEnd(CLASS_NAME, cleanerMetric);
            }
        }
    }

    private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, Table t, ValidTxnList validTxnList) throws NoSuchTxnException, MetaException {
        List<String> tblNames = Collections.singletonList(TxnUtils.getFullTableName(t.getDbName(), t.getTableName()));
        GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
        request.setValidTxnList(validTxnList.writeToString());
        GetValidWriteIdsResponse rsp = this.txnHandler.getValidWriteIds(request);
        assert (rsp != null && rsp.getTblValidWriteIdsSize() == 1);
        ValidReaderWriteIdList validWriteIdList = TxnUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
        if (ci.highestWriteId < validWriteIdList.getHighWatermark()) {
            validWriteIdList = validWriteIdList.updateHighWatermark(ci.highestWriteId);
        }
        return validWriteIdList;
    }

    private static String idWatermark(CompactionInfo ci) {
        return " id=" + ci.id;
    }

    private boolean removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci) throws IOException, NoSuchObjectException, MetaException {
        Path path = new Path(location);
        FileSystem fs = path.getFileSystem((Configuration)this.conf);
        Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
        AcidDirectory dir = AcidUtils.getAcidState(fs, path, this.conf, writeIdList, Ref.from(false), false, dirSnapshots);
        List<Path> obsoleteDirs = this.getObsoleteDirs(dir);
        List<Path> deleted = this.remove(location, ci, obsoleteDirs, true, fs);
        this.conf.set("hive.txn.valid.txns", new ValidReadTxnList().toString());
        dir = AcidUtils.getAcidState(fs, path, this.conf, new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE), Ref.from(false), false, dirSnapshots);
        List remained = ListUtils.subtract(this.getObsoleteDirs(dir), deleted);
        if (!remained.isEmpty()) {
            LOG.warn(Cleaner.idWatermark(ci) + " Remained " + remained.size() + " obsolete directories from " + location + ". " + this.getDebugInfo(remained));
            return false;
        }
        LOG.debug(Cleaner.idWatermark(ci) + " All cleared below the watermark: " + ci.highestWriteId + " from " + location);
        return true;
    }

    private List<Path> getObsoleteDirs(AcidDirectory dir) {
        List<Path> obsoleteDirs = dir.getObsolete();
        obsoleteDirs.addAll(dir.getAbortedDirectories());
        return obsoleteDirs;
    }

    private List<Path> remove(String location, CompactionInfo ci, List<Path> paths, boolean ifPurge, FileSystem fs) throws NoSuchObjectException, MetaException, IOException {
        ArrayList<Path> deleted = new ArrayList<Path>();
        if (paths.size() < 1) {
            return deleted;
        }
        LOG.info(Cleaner.idWatermark(ci) + " About to remove " + paths.size() + " obsolete directories from " + location + ". " + this.getDebugInfo(paths));
        Database db = HiveMetaStore.HMSHandler.getMSForConf(this.conf).getDatabase(MetaStoreUtils.getDefaultCatalog(this.conf), ci.dbname);
        Table table = HiveMetaStore.HMSHandler.getMSForConf(this.conf).getTable(MetaStoreUtils.getDefaultCatalog(this.conf), ci.dbname, ci.tableName);
        for (Path dead : paths) {
            LOG.debug("Going to delete path " + dead.toString());
            if (ReplChangeManager.shouldEnableCm(db, table)) {
                this.replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true);
            }
            if (!FileUtils.moveToTrash(fs, dead, this.conf, ifPurge)) continue;
            deleted.add(dead);
        }
        return deleted;
    }

    private String getDebugInfo(List<Path> paths) {
        return "[" + paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']';
    }
}

