/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.txn.compactor.metrics;

import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.common.metrics.MetricsMBeanImpl;
import org.apache.hadoop.hive.common.metrics.common.Metrics;
import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hive.common.util.Ref;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeltaFilesMetricReporter {
    private static final Logger LOG = LoggerFactory.getLogger(AcidUtils.class);
    public static final String OBJECT_NAME_PREFIX = "metrics:type=compaction,name=";
    public static final String JOB_CONF_DELTA_FILES_METRICS_METADATA = "delta.files.metrics.metadata";
    public static final char ENTRY_SEPARATOR = ';';
    public static final String KEY_VALUE_SEPARATOR = "->";
    private static long lastSuccessfulLoggingTime = 0L;
    private String hiveEntitySeparator;
    private Cache<String, Integer> deltaCache;
    private Cache<String, Integer> smallDeltaCache;
    private Cache<String, Integer> obsoleteDeltaCache;
    private MetricsMBeanImpl deltaObject;
    private MetricsMBeanImpl smallDeltaObject;
    private MetricsMBeanImpl obsoleteDeltaObject;
    private List<ObjectName> registeredObjects = new ArrayList<ObjectName>();
    private BlockingQueue<Pair<String, Integer>> deltaTopN;
    private BlockingQueue<Pair<String, Integer>> smallDeltaTopN;
    private BlockingQueue<Pair<String, Integer>> obsoleteDeltaTopN;
    private ScheduledExecutorService executorService;

    private DeltaFilesMetricReporter() {
    }

    public static DeltaFilesMetricReporter getInstance() {
        return InstanceHolder.instance;
    }

    public static synchronized void init(HiveConf conf) throws Exception {
        DeltaFilesMetricReporter.getInstance().configure(conf);
    }

    private void configure(HiveConf conf) throws Exception {
        long reportingInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_REPORTING_INTERVAL, TimeUnit.SECONDS);
        this.hiveEntitySeparator = conf.getVar(HiveConf.ConfVars.HIVE_ENTITY_SEPARATOR);
        this.initCachesForMetrics(conf);
        this.initObjectsForMetrics();
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("DeltaFilesMetricReporter %d").build();
        this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
        this.executorService.scheduleAtFixedRate(new ReportingTask(), 0L, reportingInterval, TimeUnit.SECONDS);
        LOG.info("Started DeltaFilesMetricReporter thread");
    }

    public void submit(TezCounters counters, Set<ReadEntity> inputs) {
        try {
            this.updateMetrics(DeltaFilesMetricType.NUM_OBSOLETE_DELTAS, this.obsoleteDeltaCache, this.obsoleteDeltaTopN, counters, inputs);
            this.updateMetrics(DeltaFilesMetricType.NUM_DELTAS, this.deltaCache, this.deltaTopN, counters, inputs);
            this.updateMetrics(DeltaFilesMetricType.NUM_SMALL_DELTAS, this.smallDeltaCache, this.smallDeltaTopN, counters, inputs);
        }
        catch (Exception e) {
            LOG.warn("Caught exception while trying to update delta metrics cache. Invalidating cache", (Throwable)e);
            try {
                this.obsoleteDeltaCache.invalidateAll();
                this.deltaCache.invalidateAll();
                this.smallDeltaCache.invalidateAll();
            }
            catch (Exception x) {
                LOG.warn("Caught exception while trying to invalidate cache", (Throwable)x);
            }
        }
    }

    private void updateMetrics(DeltaFilesMetricType metric, Cache<String, Integer> cache, Queue<Pair<String, Integer>> topN, TezCounters counters, Set<ReadEntity> inputs) {
        ArrayList<String> inputPaths = Lists.newArrayList();
        if (inputs != null) {
            inputs.stream().map(readEntity -> readEntity.getName().split(this.hiveEntitySeparator)).forEach(inputNames -> {
                String dbName = inputNames[0];
                String tableName = inputNames[1];
                String partitionName = ((String[])inputNames).length > 2 ? inputNames[2] : null;
                inputPaths.add(DeltaFilesMetricReporter.getDeltaCountKey(dbName, tableName, partitionName));
            });
        }
        CounterGroup group = (CounterGroup)counters.getGroup(metric.value);
        for (String key : inputPaths) {
            TezCounter counter;
            Integer prev = cache.getIfPresent(key);
            if (prev == null || (counter = counters.findCounter(group.getName(), key)) == null || counter.getValue() != 0L && counter.getValue() == (long)prev.intValue()) continue;
            cache.invalidate(key);
        }
        for (TezCounter counter : group) {
            if (counter.getValue() == 0L) continue;
            topN.add(Pair.of(counter.getName(), (int)counter.getValue()));
            cache.put(counter.getName(), (int)counter.getValue());
        }
    }

    public static void mergeDeltaFilesStats(AcidDirectory dir, long checkThresholdInSec, float deltaPctThreshold, int deltasThreshold, int obsoleteDeltasThreshold, int maxCacheSize, EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) throws IOException {
        try {
            long baseSize = DeltaFilesMetricReporter.getBaseSize(dir);
            int numObsoleteDeltas = 0;
            int numDeltas = 0;
            int numSmallDeltas = 0;
            long now = new Date().getTime();
            for (AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
                if (now - DeltaFilesMetricReporter.getModificationTime(delta, dir.getFs()) < checkThresholdInSec * 1000L) continue;
                ++numDeltas;
                long deltaSize = DeltaFilesMetricReporter.getDirSize(delta, dir.getFs());
                if (baseSize == 0L || !((float)deltaSize / (float)baseSize < deltaPctThreshold)) continue;
                ++numSmallDeltas;
            }
            DeltaFilesMetricReporter.logDeltaDirMetrics(dir, conf, numObsoleteDeltas, numDeltas, numSmallDeltas);
            String serializedMetadata = conf.get(JOB_CONF_DELTA_FILES_METRICS_METADATA);
            if (serializedMetadata == null) {
                LOG.warn("delta.files.metrics.metadata is missing from config. Delta metrics can't be updated.");
                return;
            }
            HashMap pathToMetadata = new HashMap();
            if ((pathToMetadata = (HashMap)SerializationUtilities.deserializeObject(serializedMetadata, pathToMetadata.getClass())) == null) {
                LOG.warn("Delta metrics can't be updated since the metadata is null.");
                return;
            }
            DeltaFilesMetadata metadata = (DeltaFilesMetadata)pathToMetadata.get(dir.getPath());
            DeltaFilesMetricReporter.filterAndAddToDeltaFilesStats(DeltaFilesMetricType.NUM_DELTAS, numDeltas, deltasThreshold, deltaFilesStats, metadata, maxCacheSize);
            DeltaFilesMetricReporter.filterAndAddToDeltaFilesStats(DeltaFilesMetricType.NUM_OBSOLETE_DELTAS, numObsoleteDeltas, obsoleteDeltasThreshold, deltaFilesStats, metadata, maxCacheSize);
            DeltaFilesMetricReporter.filterAndAddToDeltaFilesStats(DeltaFilesMetricType.NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, deltaFilesStats, metadata, maxCacheSize);
        }
        catch (Throwable t) {
            LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t);
        }
    }

    private static void filterAndAddToDeltaFilesStats(DeltaFilesMetricType type, int deltaCount, int deltasThreshold, EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, DeltaFilesMetadata metadata, int maxCacheSize) {
        if (deltaCount > deltasThreshold) {
            Pair<String, Integer> lowest;
            Queue<Pair<String, Integer>> pairQueue = deltaFilesStats.get((Object)type);
            if (pairQueue != null && pairQueue.size() == maxCacheSize && (lowest = pairQueue.peek()) != null && deltaCount > lowest.getValue()) {
                pairQueue.poll();
            }
            if (pairQueue == null || pairQueue.size() < maxCacheSize) {
                String deltaCountKey = DeltaFilesMetricReporter.getDeltaCountKey(metadata.dbName, metadata.tableName, metadata.partitionName);
                deltaFilesStats.computeIfAbsent(type, v -> new PriorityQueue<Pair<String, Integer>>(maxCacheSize, DeltaFilesMetricReporter.getComparator())).add(Pair.of(deltaCountKey, deltaCount));
            }
        }
    }

    private static String getDeltaCountKey(String dbName, String tableName, String partitionName) {
        StringBuilder key = new StringBuilder();
        if (dbName == null || dbName.isEmpty()) {
            key.append(tableName);
        } else {
            key.append(dbName).append(".").append(tableName);
        }
        if (partitionName != null && !partitionName.isEmpty()) {
            key.append("/");
            if (partitionName.startsWith("{") && partitionName.endsWith("}")) {
                key.append(partitionName, 1, partitionName.length() - 1);
            } else {
                key.append(partitionName);
            }
        }
        return key.toString();
    }

    private static void logDeltaDirMetrics(AcidDirectory dir, Configuration conf, int numObsoleteDeltas, int numDeltas, int numSmallDeltas) {
        long loggerFrequency = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACID_METRICS_LOGGER_FREQUENCY, TimeUnit.MILLISECONDS);
        if (loggerFrequency <= 0L) {
            return;
        }
        long currentTime = System.currentTimeMillis();
        if (lastSuccessfulLoggingTime == 0L || currentTime >= lastSuccessfulLoggingTime + loggerFrequency) {
            lastSuccessfulLoggingTime = currentTime;
            if (numDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ACTIVE_DELTA_DIR_THRESHOLD)) {
                LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " active delta directories. This can cause performance degradation.");
            }
            if (numObsoleteDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_OBSOLETE_DELTA_DIR_THRESHOLD)) {
                LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " obsolete delta directories. This can indicate compaction cleaner issues.");
            }
            if (numSmallDeltas >= HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_SMALL_DELTA_DIR_THRESHOLD)) {
                LOG.warn("Directory " + dir.getPath() + " contains " + numDeltas + " small delta directories. This can indicate performance degradation and there might be a problem with your streaming setup.");
            }
        }
    }

    public static void createCountersForAcidMetrics(TezCounters tezCounters, JobConf jobConf) {
        if (HiveConf.getBoolVar((Configuration)jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && MetastoreConf.getBoolVar((Configuration)jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
            Arrays.stream(DeltaFilesMetricType.values()).filter(type -> jobConf.get(type.name()) != null).forEach(type -> Splitter.on(';').withKeyValueSeparator(KEY_VALUE_SEPARATOR).split(jobConf.get(type.name())).forEach((path, cnt) -> tezCounters.findCounter(((DeltaFilesMetricType)type).value, path).setValue(Long.parseLong(cnt))));
        }
    }

    public static void addAcidMetricsToConfObj(EnumMap<DeltaFilesMetricType, Queue<Pair<String, Integer>>> deltaFilesStats, Configuration conf) {
        try {
            deltaFilesStats.forEach((type, value) -> conf.set(type.name(), Joiner.on(';').withKeyValueSeparator(KEY_VALUE_SEPARATOR).join((Iterable<? extends Map.Entry<?, ?>>)value)));
        }
        catch (Exception e) {
            LOG.warn("Couldn't add Delta metrics to conf object", (Throwable)e);
        }
    }

    public static void backPropagateAcidMetrics(JobConf jobConf, Configuration conf) {
        if (HiveConf.getBoolVar((Configuration)jobConf, HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED) && MetastoreConf.getBoolVar((Configuration)jobConf, MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
            try {
                Arrays.stream(DeltaFilesMetricType.values()).filter(type -> conf.get(type.name()) != null).forEach(type -> jobConf.set(type.name(), conf.get(type.name())));
            }
            catch (Exception e) {
                LOG.warn("Couldn't back propagate Delta metrics to jobConf object", (Throwable)e);
            }
        }
    }

    private static long getBaseSize(AcidDirectory dir) throws IOException {
        long baseSize = 0L;
        if (dir.getBase() != null) {
            baseSize = DeltaFilesMetricReporter.getDirSize(dir.getBase(), dir.getFs());
        } else {
            for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles()) {
                baseSize += origStat.getFileStatus().getLen();
            }
        }
        return baseSize;
    }

    private static long getModificationTime(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {
        return dir.getFiles(fs, Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus).mapToLong(FileStatus::getModificationTime).max().orElse(new Date().getTime());
    }

    private static long getDirSize(AcidUtils.ParsedDirectory dir, FileSystem fs) throws IOException {
        return dir.getFiles(fs, Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus).mapToLong(FileStatus::getLen).sum();
    }

    private void initObjectsForMetrics() throws Exception {
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        this.obsoleteDeltaObject = new MetricsMBeanImpl();
        this.registeredObjects.add(mbs.registerMBean(this.obsoleteDeltaObject, new ObjectName("metrics:type=compaction,name=compaction_num_obsolete_deltas")).getObjectName());
        this.deltaObject = new MetricsMBeanImpl();
        this.registeredObjects.add(mbs.registerMBean(this.deltaObject, new ObjectName("metrics:type=compaction,name=compaction_num_active_deltas")).getObjectName());
        this.smallDeltaObject = new MetricsMBeanImpl();
        this.registeredObjects.add(mbs.registerMBean(this.smallDeltaObject, new ObjectName("metrics:type=compaction,name=compaction_num_small_deltas")).getObjectName());
    }

    private void initCachesForMetrics(HiveConf conf) {
        int maxCacheSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE);
        long duration = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_TXN_ACID_METRICS_CACHE_DURATION, TimeUnit.SECONDS);
        this.deltaTopN = new PriorityBlockingQueue<Pair<String, Integer>>(maxCacheSize, DeltaFilesMetricReporter.getComparator());
        this.smallDeltaTopN = new PriorityBlockingQueue<Pair<String, Integer>>(maxCacheSize, DeltaFilesMetricReporter.getComparator());
        this.obsoleteDeltaTopN = new PriorityBlockingQueue<Pair<String, Integer>>(maxCacheSize, DeltaFilesMetricReporter.getComparator());
        this.deltaCache = CacheBuilder.newBuilder().expireAfterWrite(duration, TimeUnit.SECONDS).removalListener(notification -> this.removalPredicate(this.deltaTopN, notification)).softValues().build();
        this.smallDeltaCache = CacheBuilder.newBuilder().expireAfterWrite(duration, TimeUnit.SECONDS).removalListener(notification -> this.removalPredicate(this.smallDeltaTopN, notification)).softValues().build();
        this.obsoleteDeltaCache = CacheBuilder.newBuilder().expireAfterWrite(duration, TimeUnit.SECONDS).removalListener(notification -> this.removalPredicate(this.obsoleteDeltaTopN, notification)).softValues().build();
    }

    private static Comparator<Pair<String, Integer>> getComparator() {
        return Comparator.comparing(Pair::getValue);
    }

    private void removalPredicate(BlockingQueue<Pair<String, Integer>> topN, RemovalNotification notification) {
        topN.removeIf(item -> ((String)item.getKey()).equals(notification.getKey()));
    }

    public static void close() {
        if (DeltaFilesMetricReporter.getInstance() != null) {
            DeltaFilesMetricReporter.getInstance().shutdown();
        }
    }

    private void shutdown() {
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        for (ObjectName oname : this.registeredObjects) {
            if (!mbs.isRegistered(oname)) continue;
            try {
                mbs.unregisterMBean(oname);
            }
            catch (Exception e) {
                LOG.error(e.getMessage());
            }
        }
    }

    public static class DeltaFilesMetadata
    implements Serializable {
        public String dbName;
        public String tableName;
        public String partitionName;
    }

    private final class ReportingTask
    implements Runnable {
        private ReportingTask() {
        }

        @Override
        public void run() {
            Metrics metrics = MetricsFactory.getInstance();
            if (metrics != null) {
                DeltaFilesMetricReporter.this.obsoleteDeltaCache.cleanUp();
                DeltaFilesMetricReporter.this.obsoleteDeltaObject.updateAll(DeltaFilesMetricReporter.this.obsoleteDeltaCache.asMap());
                DeltaFilesMetricReporter.this.deltaCache.cleanUp();
                DeltaFilesMetricReporter.this.deltaObject.updateAll(DeltaFilesMetricReporter.this.deltaCache.asMap());
                DeltaFilesMetricReporter.this.smallDeltaCache.cleanUp();
                DeltaFilesMetricReporter.this.smallDeltaObject.updateAll(DeltaFilesMetricReporter.this.smallDeltaCache.asMap());
            }
        }
    }

    private static class InstanceHolder {
        public static DeltaFilesMetricReporter instance = new DeltaFilesMetricReporter();

        private InstanceHolder() {
        }
    }

    public static enum DeltaFilesMetricType {
        NUM_OBSOLETE_DELTAS("HIVE_ACID_NUM_OBSOLETE_DELTAS"),
        NUM_DELTAS("HIVE_ACID_NUM_DELTAS"),
        NUM_SMALL_DELTAS("HIVE_ACID_NUM_SMALL_DELTAS");

        private final String value;

        private DeltaFilesMetricType(String value) {
            this.value = value;
        }

        public String toString() {
            return this.value;
        }
    }
}

