/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.txn.compactor;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
import org.apache.parquet.Strings;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CompactorMR {
    private static final String CLASS_NAME = CompactorMR.class.getName();
    private static final Logger LOG = LoggerFactory.getLogger((String)CLASS_NAME);
    private static final String INPUT_FORMAT_CLASS_NAME = "hive.compactor.input.format.class.name";
    private static final String OUTPUT_FORMAT_CLASS_NAME = "hive.compactor.output.format.class.name";
    private static final String TMP_LOCATION = "hive.compactor.input.tmp.dir";
    private static final String FINAL_LOCATION = "hive.compactor.input.dir";
    private static final String MIN_TXN = "hive.compactor.txn.min";
    private static final String MAX_TXN = "hive.compactor.txn.max";
    private static final String IS_MAJOR = "hive.compactor.is.major";
    private static final String IS_COMPRESSED = "hive.compactor.is.compressed";
    private static final String TABLE_PROPS = "hive.compactor.table.props";
    private static final String NUM_BUCKETS = "bucket_count";
    private static final String BASE_DIR = "hive.compactor.base.dir";
    private static final String DELTA_DIRS = "hive.compactor.delta.dirs";
    private static final String DIRS_TO_SEARCH = "hive.compactor.dirs.to.search";
    private static final String TMPDIR = "_tmp";
    private static final String TBLPROPS_PREFIX = "tblprops.";
    private static final String COMPACTOR_PREFIX = "compactor.";
    private JobConf mrJob;

    @VisibleForTesting
    public JobConf createBaseJobConf(HiveConf conf, String jobName, Table t, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) {
        String queueName;
        JobConf job = new JobConf((Configuration)conf);
        job.setJobName(jobName);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setJarByClass(CompactorMR.class);
        LOG.debug("User jar set to " + job.getJar());
        job.setMapperClass(CompactorMap.class);
        job.setNumReduceTasks(0);
        job.setInputFormat(CompactorInputFormat.class);
        job.setOutputFormat(NullOutputFormat.class);
        job.setOutputCommitter(CompactorOutputCommitter.class);
        job.set(FINAL_LOCATION, sd.getLocation());
        job.set(TMP_LOCATION, CompactorMR.generateTmpPath(sd));
        job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
        job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
        job.setBoolean(IS_COMPRESSED, sd.isCompressed());
        job.set(TABLE_PROPS, new StringableMap(t.getParameters()).toString());
        job.setInt(NUM_BUCKETS, sd.getNumBuckets());
        job.set("hive.txn.valid.writeids", writeIds.toString());
        this.overrideMRProps(job, t.getParameters());
        if (ci.properties != null) {
            this.overrideTblProps(job, t.getParameters(), ci.properties);
        }
        if (!(queueName = CompactorUtil.getCompactorJobQueueName(conf, ci, t)).isEmpty()) {
            job.setQueueName(queueName);
        }
        this.setColumnTypes(job, t.getSd().getCols());
        job.setBoolean("mapreduce.map.speculative", false);
        AcidUtils.setAcidOperationalProperties((Configuration)job, true, AcidUtils.getAcidOperationalProperties(t.getParameters()));
        return job;
    }

    private void overrideTblProps(JobConf job, Map<String, String> tblproperties, String properties) {
        StringableMap stringableMap = new StringableMap(properties);
        this.overrideMRProps(job, stringableMap);
        for (String key : stringableMap.keySet()) {
            if (!key.startsWith(TBLPROPS_PREFIX)) continue;
            String propKey = key.substring(9);
            tblproperties.put(propKey, (String)stringableMap.get(key));
        }
        job.set(TABLE_PROPS, new StringableMap(tblproperties).toString());
    }

    private void overrideMRProps(JobConf job, Map<String, String> properties) {
        for (String key : properties.keySet()) {
            if (!key.startsWith(COMPACTOR_PREFIX)) continue;
            String mrKey = key.substring(10);
            job.set(mrKey, properties.get(key));
        }
    }

    public void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci, IMetaStoreClient msc, AcidDirectory dir) throws IOException {
        JobConf job = this.createBaseJobConf(conf, jobName, t, sd, writeIds, ci);
        List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
        int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
        if (parsedDeltas.size() > maxDeltasToHandle) {
            LOG.warn(parsedDeltas.size() + " delta files found for " + ci.getFullPartitionName() + " located at " + sd.getLocation() + "! This is likely a sign of misconfiguration, especially if this message repeats.  Check that compaction is running properly.  Check for any runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API.");
            int numMinorCompactions = parsedDeltas.size() / maxDeltasToHandle;
            parsedDeltas.sort(AcidUtils.ParsedDeltaLight::compareTo);
            int start = 0;
            int end = maxDeltasToHandle;
            for (int jobSubId = 0; jobSubId < numMinorCompactions; ++jobSubId) {
                while (end > 0 && end < parsedDeltas.size() && parsedDeltas.get(end).getMinWriteId() == parsedDeltas.get(end - 1).getMinWriteId() && parsedDeltas.get(end).getMaxWriteId() == parsedDeltas.get(end - 1).getMaxWriteId()) {
                    --end;
                }
                List<AcidUtils.ParsedDelta> split = parsedDeltas.subList(start, end);
                start = end;
                end = start + maxDeltasToHandle;
                JobConf jobMinorCompact = this.createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, writeIds, ci);
                this.launchCompactionJob(jobMinorCompact, null, CompactionType.MINOR, null, split, split.size(), -1, conf, msc, ci.id, jobName);
            }
            dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false);
        }
        StringableList dirsToSearch = new StringableList();
        Path baseDir = null;
        if (ci.isMajorCompaction()) {
            baseDir = dir.getBaseDirectory();
            if (baseDir == null) {
                List<HadoopShims.HdfsFileStatusWithId> originalFiles = dir.getOriginalFiles();
                if (originalFiles != null && originalFiles.size() != 0) {
                    for (HadoopShims.HdfsFileStatusWithId stat : originalFiles) {
                        Path path = stat.getFileStatus().getPath();
                        dirsToSearch.add(path);
                        LOG.debug("Adding original file " + path + " to dirs to search");
                    }
                    baseDir = new Path(sd.getLocation());
                }
            } else {
                LOG.debug("Adding base directory " + baseDir + " to dirs to search");
                dirsToSearch.add(baseDir);
            }
        }
        if (parsedDeltas.size() == 0 && dir.getOriginalFiles().size() == 0) {
            String minOpenInfo = ".";
            if (writeIds.getMinOpenWriteId() != null) {
                minOpenInfo = " with min Open " + JavaUtils.writeIdToString(writeIds.getMinOpenWriteId()) + ".  Compaction cannot compact above this writeId";
            }
            LOG.error("No delta files or original files found to compact in " + sd.getLocation() + " for compactionId=" + ci.id + minOpenInfo);
            return;
        }
        this.launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(), dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, msc, ci.id, jobName);
    }

    private static String generateTmpPath(StorageDescriptor sd) {
        return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType, StringableList dirsToSearch, List<AcidUtils.ParsedDelta> parsedDeltas, int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf, IMetaStoreClient msc, long id, String jobName) throws IOException {
        job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
        if (dirsToSearch == null) {
            dirsToSearch = new StringableList();
        }
        StringableList deltaDirs = new StringableList();
        long minTxn = Long.MAX_VALUE;
        long maxTxn = Long.MIN_VALUE;
        for (AcidUtils.ParsedDelta delta : parsedDeltas) {
            LOG.debug("Adding delta " + delta.getPath() + " to directories to search");
            dirsToSearch.add(delta.getPath());
            deltaDirs.add(delta.getPath());
            minTxn = Math.min(minTxn, delta.getMinWriteId());
            maxTxn = Math.max(maxTxn, delta.getMaxWriteId());
        }
        if (baseDir != null) {
            job.set(BASE_DIR, baseDir.toString());
        }
        job.set(DELTA_DIRS, deltaDirs.toString());
        job.set(DIRS_TO_SEARCH, dirsToSearch.toString());
        job.setLong(MIN_TXN, minTxn);
        job.setLong(MAX_TXN, maxTxn);
        job.setBoolean("mapreduce.reduce.speculative", false);
        job.setBoolean("mapreduce.map.speculative", false);
        ArrayList<Path> dirs = new ArrayList<Path>();
        if (baseDir != null) {
            dirs.add(baseDir);
        }
        dirs.addAll(deltaDirs);
        dirs.addAll(dirsToSearch);
        TokenCache.obtainTokensForNamenodes((Credentials)job.getCredentials(), (Path[])dirs.toArray(new Path[0]), (Configuration)job);
        if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
            this.mrJob = job;
        }
        LOG.info("Submitting " + compactionType + " compaction job '" + job.getJobName() + "' to " + job.getQueueName() + " queue.  (current delta dirs count=" + curDirNumber + ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]");
        try (JobClient jc = null;){
            jc = new JobClient(job);
            RunningJob rj = jc.submitJob(job);
            LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id);
            try {
                msc.setHadoopJobid(rj.getID().toString(), id);
            }
            catch (TException e) {
                LOG.warn("Error setting hadoop job, jobId=" + rj.getID().toString() + " compactionId=" + id, (Throwable)e);
            }
            rj.waitForCompletion();
            if (!rj.isSuccessful()) {
                throw new IOException((compactionType == CompactionType.MAJOR ? "Major" : "Minor") + " compactor job failed for " + jobName + "! Hadoop JobId: " + rj.getID());
            }
        }
    }

    private void setColumnTypes(JobConf job, List<FieldSchema> cols) {
        StringBuilder colNames = new StringBuilder();
        StringBuilder colTypes = new StringBuilder();
        boolean isFirst = true;
        for (FieldSchema col : cols) {
            if (isFirst) {
                isFirst = false;
            } else {
                colNames.append(',');
                colTypes.append(',');
            }
            colNames.append(col.getName());
            colTypes.append(col.getType());
        }
        job.set("schema.evolution.columns", colNames.toString());
        job.set("schema.evolution.columns.types", colTypes.toString());
        HiveConf.setVar((Configuration)job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
    }

    public JobConf getMrJob() {
        return this.mrJob;
    }

    private static <T> T instantiate(Class<T> classType, String classname) throws IOException {
        T t = null;
        try {
            Class c = JavaUtils.loadClass(classname);
            Object o = c.newInstance();
            if (!classType.isAssignableFrom(o.getClass())) {
                String s = classname + " is not an instance of " + classType.getName();
                LOG.error(s);
                throw new IOException(s);
            }
            t = o;
        }
        catch (ClassNotFoundException e) {
            LOG.error("Unable to instantiate class, " + StringUtils.stringifyException((Throwable)e));
            throw new IOException(e);
        }
        catch (InstantiationException e) {
            LOG.error("Unable to instantiate class, " + StringUtils.stringifyException((Throwable)e));
            throw new IOException(e);
        }
        catch (IllegalAccessException e) {
            LOG.error("Unable to instantiate class, " + StringUtils.stringifyException((Throwable)e));
            throw new IOException(e);
        }
        return t;
    }

    static class CompactorOutputCommitter
    extends OutputCommitter {
        CompactorOutputCommitter() {
        }

        public void setupJob(JobContext jobContext) throws IOException {
        }

        public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
        }

        public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
            return false;
        }

        public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        }

        public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
        }

        public void commitJob(JobContext context) throws IOException {
            JobConf conf = ShimLoader.getHadoopShims().getJobConf(context);
            Path tmpLocation = new Path(conf.get(CompactorMR.TMP_LOCATION));
            Path finalLocation = new Path(conf.get(CompactorMR.FINAL_LOCATION));
            FileSystem fs = tmpLocation.getFileSystem((Configuration)conf);
            LOG.debug("Moving contents of " + tmpLocation.toString() + " to " + finalLocation.toString());
            if (!fs.exists(tmpLocation)) {
                AcidOutputFormat.Options options = new AcidOutputFormat.Options((Configuration)conf).writingBase(conf.getBoolean(CompactorMR.IS_MAJOR, false)).isCompressed(conf.getBoolean(CompactorMR.IS_COMPRESSED, false)).minimumWriteId(conf.getLong(CompactorMR.MIN_TXN, Long.MAX_VALUE)).maximumWriteId(conf.getLong(CompactorMR.MAX_TXN, Long.MIN_VALUE)).bucket(0).statementId(-1).visibilityTxnId(CompactorMap.getCompactorTxnId((Configuration)conf));
                Path newDeltaDir = AcidUtils.createFilename(finalLocation, options).getParent();
                LOG.info(context.getJobID() + ": " + tmpLocation + " not found.  Assuming 0 splits.  Creating " + newDeltaDir);
                fs.mkdirs(newDeltaDir);
                if (options.isWriteVersionFile()) {
                    AcidUtils.OrcAcidVersion.writeVersionFile(newDeltaDir, fs);
                }
                return;
            }
            FileStatus[] contents = fs.listStatus(tmpLocation);
            AcidOutputFormat.Options options = new AcidOutputFormat.Options((Configuration)conf);
            for (FileStatus fileStatus : contents) {
                Path tmpPath = fileStatus.getPath();
                Path newPath = new Path(finalLocation, tmpPath.getName());
                if (options.isWriteVersionFile()) {
                    AcidUtils.OrcAcidVersion.writeVersionFile(tmpPath, fs);
                }
                fs.rename(tmpPath, newPath);
            }
            fs.delete(tmpLocation, true);
        }

        public void abortJob(JobContext context, int status) throws IOException {
            JobConf conf = ShimLoader.getHadoopShims().getJobConf(context);
            Path tmpLocation = new Path(conf.get(CompactorMR.TMP_LOCATION));
            FileSystem fs = tmpLocation.getFileSystem((Configuration)conf);
            LOG.debug("Removing " + tmpLocation.toString());
            fs.delete(tmpLocation, true);
        }
    }

    static class StringableList
    extends ArrayList<Path> {
        StringableList() {
        }

        StringableList(String s) {
            String[] parts = s.split(":", 2);
            int numElements = Integer.parseInt(parts[0]);
            s = parts[1];
            for (int i = 0; i < numElements; ++i) {
                parts = s.split(":", 2);
                int len = Integer.parseInt(parts[0]);
                String val = parts[1].substring(0, len);
                s = parts[1].substring(len);
                this.add(new Path(val));
            }
        }

        @Override
        public String toString() {
            StringBuilder buf = new StringBuilder();
            buf.append(this.size());
            buf.append(':');
            if (this.size() > 0) {
                for (Path p : this) {
                    String pStr = p.toString();
                    buf.append(pStr.length());
                    buf.append(':');
                    buf.append(pStr);
                }
            }
            return buf.toString();
        }
    }

    static class CompactorMap<V extends Writable>
    implements Mapper<WritableComparable, CompactorInputSplit, NullWritable, NullWritable> {
        JobConf jobConf;
        FileSinkOperator.RecordWriter writer = null;
        FileSinkOperator.RecordWriter deleteEventWriter = null;

        CompactorMap() {
        }

        public void map(WritableComparable key, CompactorInputSplit split, OutputCollector<NullWritable, NullWritable> nullWritableVOutputCollector, Reporter reporter) throws IOException {
            AcidInputFormat aif = (AcidInputFormat)CompactorMR.instantiate(AcidInputFormat.class, this.jobConf.get(CompactorMR.INPUT_FORMAT_CLASS_NAME));
            ValidCompactorWriteIdList writeIdList = new ValidCompactorWriteIdList(this.jobConf.get("hive.txn.valid.writeids"));
            boolean isMajor = this.jobConf.getBoolean(CompactorMR.IS_MAJOR, false);
            AcidInputFormat.RawReader reader = aif.getRawReader((Configuration)this.jobConf, isMajor, split.getBucket(), writeIdList, split.getBaseDir(), split.getDeltaDirs(), split.getDeltasToAttemptId());
            RecordIdentifier identifier = (RecordIdentifier)reader.createKey();
            Writable value = (Writable)reader.createValue();
            this.getWriter(reporter, reader.getObjectInspector(), split.getBucket());
            AcidUtils.AcidOperationalProperties acidOperationalProperties = AcidUtils.getAcidOperationalProperties((Configuration)this.jobConf);
            while (reader.next(identifier, value)) {
                boolean sawDeleteRecord = reader.isDelete(value);
                if (isMajor && sawDeleteRecord) continue;
                if (sawDeleteRecord && acidOperationalProperties.isSplitUpdate()) {
                    if (this.deleteEventWriter == null) {
                        this.getDeleteEventWriter(reporter, reader.getObjectInspector(), split.getBucket());
                    }
                    this.deleteEventWriter.write(value);
                    reporter.progress();
                    continue;
                }
                this.writer.write(value);
                reporter.progress();
            }
        }

        public void configure(JobConf entries) {
            this.jobConf = entries;
        }

        public void close() throws IOException {
            if (this.writer != null) {
                this.writer.close(false);
            }
            if (this.deleteEventWriter != null) {
                this.deleteEventWriter.close(false);
            }
        }

        static long getCompactorTxnId(Configuration jobConf) {
            String snapshot = jobConf.get("hive.txn.valid.txns");
            if (Strings.isNullOrEmpty(snapshot)) {
                throw new IllegalStateException("hive.txn.valid.txns not found for writing to " + jobConf.get(CompactorMR.FINAL_LOCATION));
            }
            ValidReadTxnList validTxnList = new ValidReadTxnList();
            validTxnList.readFromString(snapshot);
            return validTxnList.getHighWatermark();
        }

        private void getWriter(Reporter reporter, ObjectInspector inspector, int bucket) throws IOException {
            if (this.writer == null) {
                AcidOutputFormat.Options options = new AcidOutputFormat.Options((Configuration)this.jobConf);
                options.inspector(inspector).writingBase(this.jobConf.getBoolean(CompactorMR.IS_MAJOR, false)).isCompressed(this.jobConf.getBoolean(CompactorMR.IS_COMPRESSED, false)).tableProperties(new StringableMap(this.jobConf.get(CompactorMR.TABLE_PROPS)).toProperties()).reporter(reporter).minimumWriteId(this.jobConf.getLong(CompactorMR.MIN_TXN, Long.MAX_VALUE)).maximumWriteId(this.jobConf.getLong(CompactorMR.MAX_TXN, Long.MIN_VALUE)).bucket(bucket).statementId(-1).visibilityTxnId(CompactorMap.getCompactorTxnId((Configuration)this.jobConf));
                AcidOutputFormat aof = (AcidOutputFormat)CompactorMR.instantiate(AcidOutputFormat.class, this.jobConf.get(CompactorMR.OUTPUT_FORMAT_CLASS_NAME));
                Path rootDir = new Path(this.jobConf.get(CompactorMR.TMP_LOCATION));
                this.cleanupTmpLocationOnTaskRetry(options, rootDir);
                this.writer = aof.getRawRecordWriter(rootDir, options);
            }
        }

        private void cleanupTmpLocationOnTaskRetry(AcidOutputFormat.Options options, Path rootDir) throws IOException {
            Path tmpLocation = AcidUtils.createFilename(rootDir, options);
            FileSystem fs = tmpLocation.getFileSystem((Configuration)this.jobConf);
            if (fs.exists(tmpLocation)) {
                fs.delete(tmpLocation, true);
            }
        }

        private void getDeleteEventWriter(Reporter reporter, ObjectInspector inspector, int bucket) throws IOException {
            AcidOutputFormat.Options options = new AcidOutputFormat.Options((Configuration)this.jobConf);
            options.inspector(inspector).writingBase(false).writingDeleteDelta(true).isCompressed(this.jobConf.getBoolean(CompactorMR.IS_COMPRESSED, false)).tableProperties(new StringableMap(this.jobConf.get(CompactorMR.TABLE_PROPS)).toProperties()).reporter(reporter).minimumWriteId(this.jobConf.getLong(CompactorMR.MIN_TXN, Long.MAX_VALUE)).maximumWriteId(this.jobConf.getLong(CompactorMR.MAX_TXN, Long.MIN_VALUE)).bucket(bucket).statementId(-1).visibilityTxnId(CompactorMap.getCompactorTxnId((Configuration)this.jobConf));
            AcidOutputFormat aof = (AcidOutputFormat)CompactorMR.instantiate(AcidOutputFormat.class, this.jobConf.get(CompactorMR.OUTPUT_FORMAT_CLASS_NAME));
            Path rootDir = new Path(this.jobConf.get(CompactorMR.TMP_LOCATION));
            this.cleanupTmpLocationOnTaskRetry(options, rootDir);
            this.deleteEventWriter = aof.getRawRecordWriter(rootDir, options);
        }
    }

    static class CompactorRecordReader
    implements RecordReader<NullWritable, CompactorInputSplit> {
        private CompactorInputSplit split;

        CompactorRecordReader(CompactorInputSplit split) {
            this.split = split;
        }

        public boolean next(NullWritable key, CompactorInputSplit compactorInputSplit) throws IOException {
            if (this.split != null) {
                compactorInputSplit.set(this.split);
                this.split = null;
                return true;
            }
            return false;
        }

        public NullWritable createKey() {
            return NullWritable.get();
        }

        public CompactorInputSplit createValue() {
            return new CompactorInputSplit();
        }

        public long getPos() throws IOException {
            return 0L;
        }

        public void close() throws IOException {
        }

        public float getProgress() throws IOException {
            return 0.0f;
        }
    }

    static class CompactorInputFormat
    implements InputFormat<NullWritable, CompactorInputSplit> {
        CompactorInputFormat() {
        }

        public InputSplit[] getSplits(JobConf entries, int i) throws IOException {
            Path baseDir = null;
            if (entries.get(CompactorMR.BASE_DIR) != null) {
                baseDir = new Path(entries.get(CompactorMR.BASE_DIR));
            }
            StringableList tmpDeltaDirs = new StringableList(entries.get(CompactorMR.DELTA_DIRS));
            Path[] deltaDirs = tmpDeltaDirs.toArray(new Path[tmpDeltaDirs.size()]);
            StringableList dirsToSearch = new StringableList(entries.get(CompactorMR.DIRS_TO_SEARCH));
            HashMap<Integer, BucketTracker> splitToBucketMap = new HashMap<Integer, BucketTracker>();
            for (Path dir : dirsToSearch) {
                FileSystem fs = dir.getFileSystem((Configuration)entries);
                if (dir.getName().startsWith("base_") || dir.getName().startsWith("delta_") || dir.getName().startsWith("delete_delta_")) {
                    FileStatus[] files;
                    boolean sawBase = dir.getName().startsWith("base_");
                    boolean isRawFormat = !dir.getName().startsWith("delete_delta_") && AcidUtils.MetaDataFile.isRawFormat(dir, fs, null);
                    for (FileStatus f : files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter)) {
                        Matcher matcher = isRawFormat ? AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(f.getPath().getName()) : AcidUtils.BUCKET_PATTERN.matcher(f.getPath().getName());
                        this.addFileToMap(matcher, f.getPath(), sawBase, splitToBucketMap);
                    }
                    continue;
                }
                Matcher matcher = AcidUtils.LEGACY_BUCKET_DIGIT_PATTERN.matcher(dir.getName());
                this.addFileToMap(matcher, dir, true, splitToBucketMap);
            }
            boolean isTableBucketed = entries.getInt(CompactorMR.NUM_BUCKETS, -1) != -1;
            ArrayList<CompactorInputSplit> splits = new ArrayList<CompactorInputSplit>(splitToBucketMap.size());
            for (Map.Entry e : splitToBucketMap.entrySet()) {
                BucketTracker bt = (BucketTracker)e.getValue();
                Path[] deltasForSplit = isTableBucketed ? deltaDirs : CompactorInputFormat.getDeltaDirsFromBucketTracker(bt);
                splits.add(new CompactorInputSplit((Configuration)entries, (Integer)e.getKey(), bt.buckets, (Path)(bt.sawBase ? baseDir : null), deltasForSplit, bt.deltasToAttemptId));
            }
            LOG.debug("Returning " + splits.size() + " splits");
            return splits.toArray(new InputSplit[splits.size()]);
        }

        private static Path[] getDeltaDirsFromBucketTracker(BucketTracker bucketTracker) {
            ArrayList<Path> resultList = new ArrayList<Path>(bucketTracker.buckets.size());
            for (int i = 0; i < bucketTracker.buckets.size(); ++i) {
                Path p = bucketTracker.buckets.get(i).getParent();
                if (!p.getName().startsWith("delta_") && !p.getName().startsWith("delete_delta_")) continue;
                resultList.add(p);
            }
            return resultList.toArray(new Path[0]);
        }

        public RecordReader<NullWritable, CompactorInputSplit> getRecordReader(InputSplit inputSplit, JobConf entries, Reporter reporter) throws IOException {
            return new CompactorRecordReader((CompactorInputSplit)inputSplit);
        }

        private void addFileToMap(Matcher matcher, Path file, boolean sawBase, Map<Integer, BucketTracker> splitToBucketMap) {
            if (!matcher.find()) {
                String msg = "Found a non-bucket file that we thought matched the bucket pattern! " + file.toString() + " Matcher=" + matcher.toString();
                LOG.error(msg);
                throw new IllegalArgumentException(msg);
            }
            int bucketNum = -1;
            Integer attemptId = null;
            if (matcher.groupCount() > 0) {
                bucketNum = Integer.parseInt(matcher.group(1));
                attemptId = matcher.group(2) != null ? Integer.valueOf(matcher.group(2).substring(1)) : null;
            } else {
                bucketNum = Integer.valueOf(matcher.group());
            }
            BucketTracker bt = splitToBucketMap.get(bucketNum);
            if (bt == null) {
                bt = new BucketTracker();
                splitToBucketMap.put(bucketNum, bt);
            }
            LOG.debug("Adding " + file.toString() + " to list of files for splits");
            bt.buckets.add(file);
            bt.sawBase |= sawBase;
            bt.deltasToAttemptId.put(file.getParent().getName(), attemptId);
        }

        private static class BucketTracker {
            boolean sawBase = false;
            List<Path> buckets = new ArrayList<Path>();
            Map<String, Integer> deltasToAttemptId = new HashMap<String, Integer>();

            BucketTracker() {
            }
        }
    }

    static class CompactorInputSplit
    implements InputSplit {
        private long length = 0L;
        private List<String> locations;
        private int bucketNum;
        private Path base;
        private Path[] deltas;
        private Map<String, Integer> deltasToAttemptId;

        public CompactorInputSplit() {
        }

        CompactorInputSplit(Configuration hadoopConf, int bucket, List<Path> files, Path base, Path[] deltas, Map<String, Integer> deltasToAttemptId) throws IOException {
            this.bucketNum = bucket;
            this.base = base;
            this.deltas = deltas;
            this.locations = new ArrayList<String>();
            this.deltasToAttemptId = deltasToAttemptId;
            for (Path path : files) {
                FileSystem fs = path.getFileSystem(hadoopConf);
                FileStatus stat = fs.getFileStatus(path);
                this.length += stat.getLen();
                BlockLocation[] locs = fs.getFileBlockLocations(stat, 0L, this.length);
                for (int i = 0; i < locs.length; ++i) {
                    String[] hosts = locs[i].getHosts();
                    for (int j = 0; j < hosts.length; ++j) {
                        this.locations.add(hosts[j]);
                    }
                }
            }
        }

        public long getLength() throws IOException {
            return this.length;
        }

        public String[] getLocations() throws IOException {
            return this.locations.toArray(new String[this.locations.size()]);
        }

        public void write(DataOutput dataOutput) throws IOException {
            int i;
            dataOutput.writeLong(this.length);
            dataOutput.writeInt(this.locations.size());
            for (i = 0; i < this.locations.size(); ++i) {
                dataOutput.writeInt(this.locations.get(i).length());
                dataOutput.writeBytes(this.locations.get(i));
            }
            dataOutput.writeInt(this.bucketNum);
            if (this.base == null) {
                dataOutput.writeInt(0);
            } else {
                dataOutput.writeInt(this.base.toString().length());
                dataOutput.writeBytes(this.base.toString());
                Integer attemptId = this.deltasToAttemptId.get(this.base.getName());
                if (attemptId == null) {
                    dataOutput.writeInt(0);
                } else {
                    dataOutput.writeInt(attemptId.toString().length());
                    dataOutput.writeBytes(attemptId.toString());
                }
            }
            dataOutput.writeInt(this.deltas.length);
            for (i = 0; i < this.deltas.length; ++i) {
                dataOutput.writeInt(this.deltas[i].toString().length());
                dataOutput.writeBytes(this.deltas[i].toString());
                Integer attemptId = this.deltasToAttemptId.get(this.deltas[i].getName());
                if (attemptId == null) {
                    dataOutput.writeInt(0);
                    continue;
                }
                dataOutput.writeInt(attemptId.toString().length());
                dataOutput.writeBytes(attemptId.toString());
            }
        }

        public void readFields(DataInput dataInput) throws IOException {
            byte[] buf;
            int len;
            this.locations = new ArrayList<String>();
            this.length = dataInput.readLong();
            LOG.debug("Read length of " + this.length);
            int numElements = dataInput.readInt();
            LOG.debug("Read numElements of " + numElements);
            for (int i = 0; i < numElements; ++i) {
                len = dataInput.readInt();
                LOG.debug("Read file length of " + len);
                buf = new byte[len];
                dataInput.readFully(buf);
                this.locations.add(new String(buf));
            }
            this.bucketNum = dataInput.readInt();
            LOG.debug("Read bucket number of " + this.bucketNum);
            len = dataInput.readInt();
            LOG.debug("Read base path length of " + len);
            Integer baseAttemptId = null;
            if (len > 0) {
                buf = new byte[len];
                dataInput.readFully(buf);
                this.base = new Path(new String(buf));
                len = dataInput.readInt();
                if (len > 0) {
                    buf = new byte[len];
                    dataInput.readFully(buf);
                    String baseAttemptIdString = new String(buf);
                    baseAttemptId = Integer.valueOf(baseAttemptIdString);
                }
            }
            numElements = dataInput.readInt();
            this.deltasToAttemptId = new HashMap<String, Integer>();
            this.deltas = new Path[numElements];
            for (int i = 0; i < numElements; ++i) {
                len = dataInput.readInt();
                buf = new byte[len];
                dataInput.readFully(buf);
                this.deltas[i] = new Path(new String(buf));
                len = dataInput.readInt();
                Integer attemptId = null;
                if (len > 0) {
                    buf = new byte[len];
                    dataInput.readFully(buf);
                    String attemptIdString = new String(buf);
                    attemptId = Integer.valueOf(attemptIdString);
                }
                this.deltasToAttemptId.put(this.deltas[i].getName(), attemptId);
                if (baseAttemptId == null) continue;
                this.deltasToAttemptId.put(this.base.getName(), (int)baseAttemptId);
            }
            if (baseAttemptId != null) {
                this.deltasToAttemptId.put(this.base.toString(), (int)baseAttemptId);
            }
        }

        public void set(CompactorInputSplit other) {
            this.length = other.length;
            this.locations = other.locations;
            this.bucketNum = other.bucketNum;
            this.base = other.base;
            this.deltas = other.deltas;
            this.deltasToAttemptId = other.deltasToAttemptId;
        }

        int getBucket() {
            return this.bucketNum;
        }

        Path getBaseDir() {
            return this.base;
        }

        Path[] getDeltaDirs() {
            return this.deltas;
        }

        Map<String, Integer> getDeltasToAttemptId() {
            return this.deltasToAttemptId;
        }

        public String toString() {
            StringBuilder builder = new StringBuilder();
            builder.append("CompactorInputSplit{base: ");
            builder.append(this.base);
            builder.append(", bucket: ");
            builder.append(this.bucketNum);
            builder.append(", length: ");
            builder.append(this.length);
            builder.append(", deltas: [");
            for (int i = 0; i < this.deltas.length; ++i) {
                if (i != 0) {
                    builder.append(", ");
                }
                builder.append(this.deltas[i].getName());
            }
            builder.append("]}");
            return builder.toString();
        }
    }
}

