/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.sort.impl.dflt;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.Progress;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.ExternalSorter;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSorter
extends ExternalSorter
implements IndexedSortable {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultSorter.class);
    public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
    private static final int APPROX_HEADER_LENGTH = 150;
    private final IntBuffer kvmeta;
    int kvstart;
    int kvend;
    int kvindex;
    int equator;
    int bufstart;
    int bufend;
    int bufmark;
    int bufindex;
    int bufvoid;
    private final byte[] kvbuffer;
    private final byte[] b0 = new byte[0];
    protected static final int VALSTART = 0;
    protected static final int KEYSTART = 1;
    protected static final int PARTITION = 2;
    protected static final int VALLEN = 3;
    protected static final int NMETA = 4;
    protected static final int METASIZE = 16;
    final int maxRec;
    final int softLimit;
    boolean spillInProgress;
    int bufferRemaining;
    volatile Throwable sortSpillException = null;
    final int minSpillsForCombine;
    final ReentrantLock spillLock = new ReentrantLock();
    final Condition spillDone = this.spillLock.newCondition();
    final Condition spillReady = this.spillLock.newCondition();
    final BlockingBuffer bb = new BlockingBuffer();
    volatile boolean spillThreadRunning = false;
    final SpillThread spillThread = new SpillThread();
    final ArrayList<TezSpillRecord> indexCacheList = new ArrayList();
    private final int indexCacheMemoryLimit;
    private int totalIndexCacheMemory;
    private long totalKeys = 0L;
    private long sameKey = 0L;
    public static final int MAX_IO_SORT_MB = 1800;
    final byte[] META_BUFFER_TMP = new byte[16];

    public DefaultSorter(OutputContext outputContext, Configuration conf, int numOutputs, long initialMemoryAvailable) throws IOException {
        super(outputContext, conf, numOutputs, initialMemoryAvailable);
        float spillper = this.conf.getFloat("tez.runtime.sort.spill.percent", 0.8f);
        int sortmb = DefaultSorter.computeSortBufferSize((int)this.availableMemoryMb, outputContext.getDestinationVertexName());
        Preconditions.checkArgument((spillper <= 1.0f && spillper > 0.0f ? 1 : 0) != 0, (Object)"tez.runtime.sort.spill.percent should be greater than 0 and less than or equal to 1");
        this.indexCacheMemoryLimit = this.conf.getInt("tez.runtime.index.cache.memory.limit.bytes", 0x100000);
        boolean confPipelinedShuffle = this.conf.getBoolean("tez.runtime.pipelined-shuffle.enabled", false);
        if (confPipelinedShuffle) {
            LOG.warn(outputContext.getDestinationVertexName() + ": " + "tez.runtime.pipelined-shuffle.enabled" + " does not work " + "with DefaultSorter. It is supported only with PipelinedSorter.");
        }
        int maxMemUsage = sortmb << 20;
        maxMemUsage -= maxMemUsage % 16;
        this.kvbuffer = new byte[maxMemUsage];
        this.bufvoid = this.kvbuffer.length;
        this.kvmeta = ByteBuffer.wrap(this.kvbuffer).order(ByteOrder.nativeOrder()).asIntBuffer();
        this.setEquator(0);
        this.bufend = this.bufindex = this.equator;
        this.bufstart = this.bufindex;
        this.kvstart = this.kvend = this.kvindex;
        this.maxRec = this.kvmeta.capacity() / 4;
        this.bufferRemaining = this.softLimit = (int)((float)this.kvbuffer.length * spillper);
        if (LOG.isInfoEnabled()) {
            LOG.info(outputContext.getDestinationVertexName() + ": " + "tez.runtime.io.sort.mb" + "=" + sortmb + ", soft limit=" + this.softLimit + ", bufstart=" + this.bufstart + ", bufvoid=" + this.bufvoid + ", kvstart=" + this.kvstart + ", legnth=" + this.maxRec + ", finalMergeEnabled=" + this.isFinalMergeEnabled());
        }
        this.valSerializer.open((OutputStream)this.bb);
        this.keySerializer.open((OutputStream)this.bb);
        this.spillInProgress = false;
        this.minSpillsForCombine = this.conf.getInt("tez.runtime.combine.min.spills", 3);
        this.spillThread.setDaemon(true);
        this.spillThread.setName("SpillThread {" + TezUtilsInternal.cleanVertexName((String)(outputContext.getDestinationVertexName() + "}")));
        this.spillLock.lock();
        try {
            this.spillThread.start();
            while (!this.spillThreadRunning) {
                this.spillDone.await();
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Spill thread failed to initialize", e);
        }
        finally {
            this.spillLock.unlock();
        }
        if (this.sortSpillException != null) {
            throw new IOException("Spill thread failed to initialize", this.sortSpillException);
        }
    }

    @VisibleForTesting
    static int computeSortBufferSize(int availableMemoryMB, String logContext) {
        if (availableMemoryMB <= 0) {
            throw new RuntimeException("tez.runtime.io.sort.mb=" + availableMemoryMB + ". It should be > 0");
        }
        if (availableMemoryMB > 1800) {
            LOG.warn(logContext + ": Scaling down " + "tez.runtime.io.sort.mb" + "=" + availableMemoryMB + " to " + 1800 + " (max sort buffer size supported forDefaultSorter)");
        }
        return Math.min(1800, availableMemoryMB);
    }

    @Override
    public void write(Object key, Object value) throws IOException {
        this.collect(key, value, this.partitioner.getPartition(key, value, this.partitions));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized void collect(Object key, Object value, int partition) throws IOException {
        if (key.getClass() != this.keyClass) {
            throw new IOException("Type mismatch in key from map: expected " + this.keyClass.getName() + ", received " + key.getClass().getName());
        }
        if (value.getClass() != this.valClass) {
            throw new IOException("Type mismatch in value from map: expected " + this.valClass.getName() + ", received " + value.getClass().getName());
        }
        if (partition < 0 || partition >= this.partitions) {
            throw new IOException("Illegal partition for " + key + " (" + partition + ")" + ", TotalPartitions: " + this.partitions);
        }
        this.checkSpillException();
        this.bufferRemaining -= 16;
        if (this.bufferRemaining <= 0) {
            this.spillLock.lock();
            try {
                if (!this.spillInProgress) {
                    boolean bufsoftlimit;
                    int kvbidx = 4 * this.kvindex;
                    int kvbend = 4 * this.kvend;
                    int bUsed = this.distanceTo(kvbidx, this.bufindex);
                    boolean bl = bufsoftlimit = bUsed >= this.softLimit;
                    if ((kvbend + 16) % this.kvbuffer.length != this.equator - this.equator % 16) {
                        this.resetSpill();
                        this.bufferRemaining = Math.min(this.distanceTo(this.bufindex, kvbidx) - 32, this.softLimit - bUsed) - 16;
                    } else if (bufsoftlimit && this.kvindex != this.kvend) {
                        this.startSpill();
                        int avgRec = (int)(this.mapOutputByteCounter.getValue() / this.mapOutputRecordCounter.getValue());
                        int distkvi = this.distanceTo(this.bufindex, kvbidx);
                        int newPos = (this.bufindex + Math.max(31, Math.min(distkvi / 2, distkvi / (16 + avgRec) * 16))) % this.kvbuffer.length;
                        this.setEquator(newPos);
                        this.bufmark = this.bufindex = newPos;
                        int serBound = 4 * this.kvend;
                        this.bufferRemaining = Math.min(this.distanceTo(this.bufend, newPos), Math.min(this.distanceTo(newPos, serBound), this.softLimit)) - 32;
                    }
                }
            }
            finally {
                this.spillLock.unlock();
            }
        }
        try {
            int keystart = this.bufindex;
            this.keySerializer.serialize(key);
            if (this.bufindex < keystart) {
                this.bb.shiftBufferedKey();
                keystart = 0;
            }
            int valstart = this.bufindex;
            this.valSerializer.serialize(value);
            this.bb.write(this.b0, 0, 0);
            int valend = this.bb.markRecord();
            this.mapOutputRecordCounter.increment(1L);
            this.mapOutputByteCounter.increment((long)this.distanceTo(keystart, valend, this.bufvoid));
            this.kvmeta.put(this.kvindex + 2, partition);
            this.kvmeta.put(this.kvindex + 1, keystart);
            this.kvmeta.put(this.kvindex + 0, valstart);
            this.kvmeta.put(this.kvindex + 3, this.distanceTo(valstart, valend));
            this.kvindex = (int)(((long)this.kvindex - 4L + (long)this.kvmeta.capacity()) % (long)this.kvmeta.capacity());
            ++this.totalKeys;
        }
        catch (ExternalSorter.MapBufferTooSmallException e) {
            LOG.info(this.outputContext.getDestinationVertexName() + ": Record too large for in-memory buffer: " + e.getMessage());
            this.spillSingleRecord(key, value, partition);
            this.mapOutputRecordCounter.increment(1L);
            return;
        }
    }

    private void setEquator(int pos) {
        this.equator = pos;
        int aligned = pos - pos % 16;
        this.kvindex = (int)(((long)aligned - 16L + (long)this.kvbuffer.length) % (long)this.kvbuffer.length) / 4;
        if (LOG.isInfoEnabled()) {
            LOG.info(this.outputContext.getDestinationVertexName() + ": " + "(EQUATOR) " + pos + " kvi " + this.kvindex + "(" + this.kvindex * 4 + ")");
        }
    }

    private void resetSpill() {
        int e;
        this.bufstart = this.bufend = (e = this.equator);
        int aligned = e - e % 16;
        this.kvstart = this.kvend = (int)(((long)aligned - 16L + (long)this.kvbuffer.length) % (long)this.kvbuffer.length) / 4;
        if (LOG.isInfoEnabled()) {
            LOG.info(this.outputContext.getDestinationVertexName() + ": " + "(RESET) equator " + e + " kv " + this.kvstart + "(" + this.kvstart * 4 + ")" + " kvi " + this.kvindex + "(" + this.kvindex * 4 + ")");
        }
    }

    final int distanceTo(int i, int j) {
        return this.distanceTo(i, j, this.kvbuffer.length);
    }

    int distanceTo(int i, int j, int mod) {
        return i <= j ? j - i : mod - i + j;
    }

    int offsetFor(int metapos) {
        return metapos % this.maxRec * 4;
    }

    public int compare(int mi, int mj) {
        int kvjp;
        int kvi = this.offsetFor(mi);
        int kvj = this.offsetFor(mj);
        int kvip = this.kvmeta.get(kvi + 2);
        if (kvip != (kvjp = this.kvmeta.get(kvj + 2))) {
            return kvip - kvjp;
        }
        int result = this.comparator.compare(this.kvbuffer, this.kvmeta.get(kvi + 1), this.kvmeta.get(kvi + 0) - this.kvmeta.get(kvi + 1), this.kvbuffer, this.kvmeta.get(kvj + 1), this.kvmeta.get(kvj + 0) - this.kvmeta.get(kvj + 1));
        if (result == 0) {
            ++this.sameKey;
        }
        return result;
    }

    public void swap(int mi, int mj) {
        int iOff = mi % this.maxRec * 16;
        int jOff = mj % this.maxRec * 16;
        System.arraycopy(this.kvbuffer, iOff, this.META_BUFFER_TMP, 0, 16);
        System.arraycopy(this.kvbuffer, jOff, this.kvbuffer, iOff, 16);
        System.arraycopy(this.META_BUFFER_TMP, 0, this.kvbuffer, jOff, 16);
    }

    @Override
    public void flush() throws IOException {
        LOG.info("Starting flush of map output");
        this.spillLock.lock();
        try {
            while (this.spillInProgress) {
                this.spillDone.await();
            }
            this.checkSpillException();
            int kvbend = 4 * this.kvend;
            if ((kvbend + 16) % this.kvbuffer.length != this.equator - this.equator % 16) {
                this.resetSpill();
            }
            if (this.kvindex != this.kvend) {
                this.kvend = (this.kvindex + 4) % this.kvmeta.capacity();
                this.bufend = this.bufmark;
                if (LOG.isInfoEnabled()) {
                    LOG.info(this.outputContext.getDestinationVertexName() + ": " + "Sorting & Spilling map output. " + "bufstart = " + this.bufstart + ", bufend = " + this.bufmark + ", bufvoid = " + this.bufvoid + "; " + "kvstart=" + this.kvstart + "(" + this.kvstart * 4 + ")" + ", kvend = " + this.kvend + "(" + this.kvend * 4 + ")" + ", length = " + (this.distanceTo(this.kvend, this.kvstart, this.kvmeta.capacity()) + 1) + "/" + this.maxRec);
                }
                this.sortAndSpill();
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while waiting for the writer", e);
        }
        finally {
            this.spillLock.unlock();
        }
        assert (!this.spillLock.isHeldByCurrentThread());
        try {
            this.spillThread.interrupt();
            this.spillThread.join();
        }
        catch (InterruptedException e) {
            throw new IOException("Spill failed", e);
        }
        this.mergeParts();
        if (this.isFinalMergeEnabled()) {
            this.fileOutputByteCounter.increment(this.rfs.getFileStatus(this.finalOutputFile).getLen());
        }
    }

    @Override
    public void close() throws IOException {
    }

    private void checkSpillException() throws IOException {
        Throwable lspillException = this.sortSpillException;
        if (lspillException != null) {
            if (lspillException instanceof Error) {
                String logMsg = "Task " + this.outputContext.getUniqueIdentifier() + " failed : " + ExceptionUtils.getStackTrace((Throwable)lspillException);
                this.outputContext.fatalError(lspillException, logMsg);
            }
            throw new IOException("Spill failed", lspillException);
        }
    }

    private void startSpill() {
        assert (!this.spillInProgress);
        this.kvend = (this.kvindex + 4) % this.kvmeta.capacity();
        this.bufend = this.bufmark;
        this.spillInProgress = true;
        if (LOG.isInfoEnabled()) {
            LOG.info(this.outputContext.getDestinationVertexName() + ": Spilling map output." + "bufstart=" + this.bufstart + ", bufend = " + this.bufmark + ", bufvoid = " + this.bufvoid + "; kvstart=" + this.kvstart + "(" + this.kvstart * 4 + ")" + ", kvend = " + this.kvend + "(" + this.kvend * 4 + ")" + ", length = " + (this.distanceTo(this.kvend, this.kvstart, this.kvmeta.capacity()) + 1) + "/" + this.maxRec);
        }
        this.spillReady.signal();
    }

    int getMetaStart() {
        return this.kvend / 4;
    }

    int getMetaEnd() {
        return 1 + (this.kvstart >= this.kvend ? this.kvstart : this.kvmeta.capacity() + this.kvstart) / 4;
    }

    private boolean isRLENeeded() {
        return (double)this.sameKey > 0.1 * (double)this.totalKeys || this.sameKey < 0L;
    }

    protected void sortAndSpill() throws IOException, InterruptedException {
        int mstart = this.getMetaStart();
        int mend = this.getMetaEnd();
        this.sorter.sort((IndexedSortable)this, mstart, mend, this.nullProgressable);
        this.spill(mstart, mend);
    }

    private void adjustSpillCounters(long rawLen, long compLength) {
        if (!this.isFinalMergeEnabled()) {
            this.outputBytesWithOverheadCounter.increment(rawLen);
        } else if (this.numSpills > 0) {
            this.additionalSpillBytesWritten.increment(compLength);
            this.outputBytesWithOverheadCounter.setValue(0L);
        } else {
            this.outputBytesWithOverheadCounter.increment(rawLen);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void spill(int mstart, int mend) throws IOException, InterruptedException {
        long size = (this.bufend >= this.bufstart ? this.bufend - this.bufstart : this.bufvoid - this.bufend + this.bufstart) + this.partitions * 150;
        try (FSDataOutputStream out = null;){
            TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
            Path filename = this.mapOutputFile.getSpillFileForWrite(this.numSpills, size);
            this.spillFilePaths.put(this.numSpills, filename);
            out = this.rfs.create(filename);
            int spindex = mstart;
            InMemValBytes value = this.createInMemValBytes();
            boolean rle = this.isRLENeeded();
            for (int i = 0; i < this.partitions; ++i) {
                IFile.Writer writer = null;
                try {
                    long segmentStart = out.getPos();
                    writer = new IFile.Writer(this.conf, out, this.keyClass, this.valClass, this.codec, this.spilledRecordsCounter, null, rle);
                    if (this.combiner == null) {
                        DataInputBuffer key = new DataInputBuffer();
                        while (spindex < mend && this.kvmeta.get(this.offsetFor(spindex) + 2) == i) {
                            int kvoff = this.offsetFor(spindex);
                            int keystart = this.kvmeta.get(kvoff + 1);
                            int valstart = this.kvmeta.get(kvoff + 0);
                            key.reset(this.kvbuffer, keystart, valstart - keystart);
                            this.getVBytesForOffset(kvoff, value);
                            writer.append(key, value);
                            ++spindex;
                        }
                    } else {
                        int spstart = spindex;
                        while (spindex < mend && this.kvmeta.get(this.offsetFor(spindex) + 2) == i) {
                            ++spindex;
                        }
                        if (spstart != spindex) {
                            MRResultIterator kvIter = new MRResultIterator(spstart, spindex);
                            if (LOG.isDebugEnabled()) {
                                LOG.debug(this.outputContext.getDestinationVertexName() + ": " + "Running combine processor");
                            }
                            this.runCombineProcessor(kvIter, writer);
                        }
                    }
                    writer.close();
                    this.adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
                    TezIndexRecord rec = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
                    spillRec.putIndex(rec, i);
                    writer = null;
                    continue;
                }
                finally {
                    if (null != writer) {
                        writer.close();
                    }
                }
            }
            if (this.totalIndexCacheMemory >= this.indexCacheMemoryLimit) {
                Path indexFilename = this.mapOutputFile.getSpillIndexFileForWrite(this.numSpills, this.partitions * 24);
                this.spillFileIndexPaths.put(this.numSpills, indexFilename);
                spillRec.writeToFile(indexFilename, this.conf);
            } else {
                this.indexCacheList.add(spillRec);
                this.totalIndexCacheMemory += spillRec.size() * 24;
            }
            LOG.info(this.outputContext.getDestinationVertexName() + ": " + "Finished spill " + this.numSpills);
            ++this.numSpills;
            if (!this.isFinalMergeEnabled()) {
                this.numShuffleChunks.setValue((long)this.numSpills);
            } else if (this.numSpills > 1) {
                this.numAdditionalSpills.increment(1L);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void spillSingleRecord(Object key, Object value, int partition) throws IOException {
        long size = this.kvbuffer.length + this.partitions * 150;
        try (FSDataOutputStream out = null;){
            TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
            Path filename = this.mapOutputFile.getSpillFileForWrite(this.numSpills, size);
            this.spillFilePaths.put(this.numSpills, filename);
            out = this.rfs.create(filename);
            for (int i = 0; i < this.partitions; ++i) {
                IFile.Writer writer = null;
                try {
                    long segmentStart = out.getPos();
                    writer = new IFile.Writer(this.conf, out, this.keyClass, this.valClass, this.codec, this.spilledRecordsCounter, null);
                    if (i == partition) {
                        long recordStart = out.getPos();
                        writer.append(key, value);
                        this.mapOutputByteCounter.increment(out.getPos() - recordStart);
                    }
                    writer.close();
                    this.adjustSpillCounters(writer.getRawLength(), writer.getCompressedLength());
                    TezIndexRecord rec = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
                    spillRec.putIndex(rec, i);
                    writer = null;
                    continue;
                }
                catch (IOException e) {
                    if (null != writer) {
                        writer.close();
                    }
                    throw e;
                }
            }
            if (this.totalIndexCacheMemory >= this.indexCacheMemoryLimit) {
                Path indexFilename = this.mapOutputFile.getSpillIndexFileForWrite(this.numSpills, this.partitions * 24);
                this.spillFileIndexPaths.put(this.numSpills, indexFilename);
                spillRec.writeToFile(indexFilename, this.conf);
            } else {
                this.indexCacheList.add(spillRec);
                this.totalIndexCacheMemory += spillRec.size() * 24;
            }
            ++this.numSpills;
            if (!this.isFinalMergeEnabled()) {
                this.numShuffleChunks.setValue((long)this.numSpills);
            } else if (this.numSpills > 1) {
                this.numAdditionalSpills.increment(1L);
            }
        }
    }

    protected int getInMemVBytesLength(int kvoff) {
        int vallen = this.kvmeta.get(kvoff + 3);
        assert (vallen >= 0);
        return vallen;
    }

    int getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
        int vallen = this.getInMemVBytesLength(kvoff);
        vbytes.reset(this.kvbuffer, this.kvmeta.get(kvoff + 0), vallen);
        return vallen;
    }

    InMemValBytes createInMemValBytes() {
        return new InMemValBytes(this.bufvoid);
    }

    private void maybeSendEventForSpill(List<Event> events, boolean isLastEvent, TezSpillRecord spillRecord, int index, boolean sendEvent) throws IOException {
        if (this.isFinalMergeEnabled()) {
            return;
        }
        Preconditions.checkArgument((spillRecord != null ? 1 : 0) != 0, (Object)"Spill record can not be null");
        String pathComponent = this.outputContext.getUniqueIdentifier() + "_" + index;
        ShuffleUtils.generateEventOnSpill(events, this.isFinalMergeEnabled(), isLastEvent, this.outputContext, index, spillRecord, this.partitions, this.sendEmptyPartitionDetails, pathComponent);
        LOG.info(this.outputContext.getDestinationVertexName() + ": " + "Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + index);
        if (sendEvent) {
            this.outputContext.sendEvents(events);
        }
    }

    private void maybeAddEventsForSpills() throws IOException {
        if (this.isFinalMergeEnabled()) {
            return;
        }
        LinkedList events = Lists.newLinkedList();
        for (int i = 0; i < this.numSpills; ++i) {
            TezSpillRecord spillRecord = this.indexCacheList.get(i);
            if (spillRecord == null) {
                spillRecord = new TezSpillRecord((Path)this.spillFileIndexPaths.get(i), this.conf);
            } else if (this.spillFileIndexPaths.get(i) == null) {
                Path indexPath = this.mapOutputFile.getSpillIndexFileForWrite(i, this.partitions * 24);
                spillRecord.writeToFile(indexPath, this.conf);
            }
            this.maybeSendEventForSpill(events, i == this.numSpills - 1, spillRecord, i, false);
            this.fileOutputByteCounter.increment(this.rfs.getFileStatus((Path)this.spillFilePaths.get(i)).getLen());
        }
        this.outputContext.sendEvents((List)events);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mergeParts() throws IOException {
        int i;
        long finalOutFileSize = 0L;
        long finalIndexFileSize = 0L;
        Path[] filename = new Path[this.numSpills];
        String taskIdentifier = this.outputContext.getUniqueIdentifier();
        for (i = 0; i < this.numSpills; ++i) {
            filename[i] = (Path)this.spillFilePaths.get(i);
            finalOutFileSize += this.rfs.getFileStatus(filename[i]).getLen();
        }
        if (this.numSpills == 1) {
            if (this.isFinalMergeEnabled()) {
                this.finalOutputFile = this.mapOutputFile.getOutputFileForWriteInVolume(filename[0]);
                this.finalIndexFile = this.mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
                this.sameVolRename(filename[0], this.finalOutputFile);
                if (this.indexCacheList.size() == 0) {
                    this.sameVolRename((Path)this.spillFileIndexPaths.get(0), this.finalIndexFile);
                } else {
                    this.indexCacheList.get(0).writeToFile(this.finalIndexFile, this.conf);
                }
            } else {
                LinkedList events = Lists.newLinkedList();
                TezSpillRecord spillRecord = this.indexCacheList.get(0);
                Path indexPath = this.mapOutputFile.getSpillIndexFileForWrite(this.numSpills - 1, this.partitions * 24);
                spillRecord.writeToFile(indexPath, this.conf);
                this.maybeSendEventForSpill(events, true, spillRecord, 0, true);
                this.fileOutputByteCounter.increment(this.rfs.getFileStatus((Path)this.spillFilePaths.get(0)).getLen());
            }
            this.numShuffleChunks.setValue((long)this.numSpills);
            return;
        }
        for (i = this.indexCacheList.size(); i < this.numSpills; ++i) {
            Path indexFileName = (Path)this.spillFileIndexPaths.get(i);
            this.indexCacheList.add(new TezSpillRecord(indexFileName, this.conf));
        }
        if (this.numSpills > 0 && !this.isFinalMergeEnabled()) {
            this.maybeAddEventsForSpills();
            return;
        }
        finalOutFileSize += (long)(this.partitions * 150);
        finalIndexFileSize = this.partitions * 24;
        if (this.isFinalMergeEnabled()) {
            this.finalOutputFile = this.mapOutputFile.getOutputFileForWrite(finalOutFileSize);
            this.finalIndexFile = this.mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
        } else if (this.numSpills == 0) {
            this.finalOutputFile = this.mapOutputFile.getSpillFileForWrite(this.numSpills, finalOutFileSize);
            this.finalIndexFile = this.mapOutputFile.getSpillIndexFileForWrite(this.numSpills, finalIndexFileSize);
        }
        FSDataOutputStream finalOut = this.rfs.create(this.finalOutputFile, true, 4096);
        if (this.numSpills == 0) {
            TezSpillRecord sr = new TezSpillRecord(this.partitions);
            try {
                for (int i2 = 0; i2 < this.partitions; ++i2) {
                    long segmentStart = finalOut.getPos();
                    IFile.Writer writer = new IFile.Writer(this.conf, finalOut, this.keyClass, this.valClass, this.codec, null, null);
                    writer.close();
                    TezIndexRecord rec = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
                    this.outputBytesWithOverheadCounter.increment(writer.getRawLength());
                    sr.putIndex(rec, i2);
                }
                sr.writeToFile(this.finalIndexFile, this.conf);
            }
            finally {
                finalOut.close();
            }
            ++this.numSpills;
            if (!this.isFinalMergeEnabled()) {
                LinkedList events = Lists.newLinkedList();
                this.maybeSendEventForSpill(events, true, sr, 0, true);
                this.fileOutputByteCounter.increment(this.rfs.getFileStatus(this.finalOutputFile).getLen());
            }
            this.numShuffleChunks.setValue((long)this.numSpills);
            return;
        }
        TezSpillRecord spillRec = new TezSpillRecord(this.partitions);
        for (int parts = 0; parts < this.partitions; ++parts) {
            ArrayList<TezMerger.Segment> segmentList = new ArrayList<TezMerger.Segment>(this.numSpills);
            for (int i3 = 0; i3 < this.numSpills; ++i3) {
                TezIndexRecord indexRecord = this.indexCacheList.get(i3).getIndex(parts);
                TezMerger.DiskSegment s = new TezMerger.DiskSegment(this.rfs, filename[i3], indexRecord.getStartOffset(), indexRecord.getPartLength(), this.codec, this.ifileReadAhead, this.ifileReadAheadLength, this.ifileBufferSize, true);
                segmentList.add(i3, s);
                if (!LOG.isDebugEnabled()) continue;
                LOG.debug(this.outputContext.getDestinationVertexName() + ": " + "TaskIdentifier=" + taskIdentifier + " Partition=" + parts + "Spill =" + i3 + "(" + indexRecord.getStartOffset() + "," + indexRecord.getRawLength() + ", " + indexRecord.getPartLength() + ")");
            }
            int mergeFactor = this.conf.getInt("tez.runtime.io.sort.factor", 100);
            boolean sortSegments = segmentList.size() > mergeFactor;
            TezRawKeyValueIterator kvIter = TezMerger.merge(this.conf, this.rfs, this.keyClass, this.valClass, this.codec, segmentList, mergeFactor, new Path(taskIdentifier), ConfigUtils.getIntermediateOutputKeyComparator(this.conf), this.nullProgressable, sortSegments, true, null, this.spilledRecordsCounter, this.additionalSpillBytesRead, null);
            long segmentStart = finalOut.getPos();
            IFile.Writer writer = new IFile.Writer(this.conf, finalOut, this.keyClass, this.valClass, this.codec, this.spilledRecordsCounter, null);
            if (this.combiner == null || this.numSpills < this.minSpillsForCombine) {
                TezMerger.writeFile(kvIter, writer, this.nullProgressable, 10000L);
            } else {
                this.runCombineProcessor(kvIter, writer);
            }
            writer.close();
            this.outputBytesWithOverheadCounter.increment(writer.getRawLength());
            TezIndexRecord rec = new TezIndexRecord(segmentStart, writer.getRawLength(), writer.getCompressedLength());
            spillRec.putIndex(rec, parts);
        }
        this.numShuffleChunks.setValue(1L);
        spillRec.writeToFile(this.finalIndexFile, this.conf);
        finalOut.close();
        for (int i4 = 0; i4 < this.numSpills; ++i4) {
            this.rfs.delete(filename[i4], true);
        }
    }

    static /* synthetic */ OutputContext access$500(DefaultSorter x0) {
        return x0.outputContext;
    }

    static /* synthetic */ Logger access$600() {
        return LOG;
    }

    protected class MRResultIterator
    implements TezRawKeyValueIterator {
        private final DataInputBuffer keybuf = new DataInputBuffer();
        private final InMemValBytes vbytes = DefaultSorter.this.createInMemValBytes();
        private final int end;
        private int current;

        public MRResultIterator(int start, int end) {
            this.end = end;
            this.current = start - 1;
        }

        @Override
        public boolean next() throws IOException {
            return ++this.current < this.end;
        }

        @Override
        public DataInputBuffer getKey() throws IOException {
            int kvoff = DefaultSorter.this.offsetFor(this.current);
            this.keybuf.reset(DefaultSorter.this.kvbuffer, DefaultSorter.this.kvmeta.get(kvoff + 1), DefaultSorter.this.kvmeta.get(kvoff + 0) - DefaultSorter.this.kvmeta.get(kvoff + 1));
            return this.keybuf;
        }

        @Override
        public DataInputBuffer getValue() throws IOException {
            DefaultSorter.this.getVBytesForOffset(DefaultSorter.this.offsetFor(this.current), this.vbytes);
            return this.vbytes;
        }

        @Override
        public Progress getProgress() {
            return null;
        }

        @Override
        public boolean isSameKey() throws IOException {
            return false;
        }

        @Override
        public void close() {
        }
    }

    static class InMemValBytes
    extends DataInputBuffer {
        private byte[] buffer;
        private int start;
        private int length;
        private final int bufvoid;

        public InMemValBytes(int bufvoid) {
            this.bufvoid = bufvoid;
        }

        public void reset(byte[] buffer, int start, int length) {
            this.buffer = buffer;
            this.start = start;
            this.length = length;
            if (start + length > this.bufvoid) {
                this.buffer = new byte[this.length];
                int taillen = this.bufvoid - start;
                System.arraycopy(buffer, start, this.buffer, 0, taillen);
                System.arraycopy(buffer, 0, this.buffer, taillen, length - taillen);
                this.start = 0;
            }
            super.reset(this.buffer, this.start, this.length);
        }
    }

    protected class SpillThread
    extends Thread {
        protected SpillThread() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Unable to fully structure code
         */
        @Override
        public void run() {
            DefaultSorter.this.spillLock.lock();
            try {
                try {
                    DefaultSorter.this.spillThreadRunning = true;
                    while (true) lbl-1000:
                    // 4 sources

                    {
                        DefaultSorter.this.spillDone.signal();
                        while (!DefaultSorter.this.spillInProgress) {
                            DefaultSorter.this.spillReady.await();
                        }
                        try {
                            DefaultSorter.this.spillLock.unlock();
                            DefaultSorter.this.sortAndSpill();
                        }
                        catch (Throwable t) {
                            DefaultSorter.access$600().warn(DefaultSorter.access$500(DefaultSorter.this).getDestinationVertexName() + ": " + "Got an exception in sortAndSpill", t);
                            DefaultSorter.this.sortSpillException = t;
                        }
                        finally {
                            DefaultSorter.this.spillLock.lock();
                            if (DefaultSorter.this.bufend < DefaultSorter.this.bufstart) {
                                DefaultSorter.this.bufvoid = DefaultSorter.access$000(DefaultSorter.this).length;
                            }
                            DefaultSorter.this.kvstart = DefaultSorter.this.kvend;
                            DefaultSorter.this.bufstart = DefaultSorter.this.bufend;
                            DefaultSorter.this.spillInProgress = false;
                            continue;
                        }
                        break;
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    DefaultSorter.this.spillLock.unlock();
                    DefaultSorter.this.spillThreadRunning = false;
                }
                ** GOTO lbl-1000
            }
            catch (Throwable var3_4) {
                DefaultSorter.this.spillLock.unlock();
                DefaultSorter.this.spillThreadRunning = false;
                throw var3_4;
            }
        }
    }

    public class Buffer
    extends OutputStream {
        private final byte[] scratch = new byte[1];

        @Override
        public void write(int v) throws IOException {
            this.scratch[0] = (byte)v;
            this.write(this.scratch, 0, 1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            long futureBufIndex;
            DefaultSorter.this.bufferRemaining -= len;
            if (DefaultSorter.this.bufferRemaining <= 0) {
                boolean blockwrite = false;
                DefaultSorter.this.spillLock.lock();
                try {
                    do {
                        DefaultSorter.this.checkSpillException();
                        int kvbidx = 4 * DefaultSorter.this.kvindex;
                        int kvbend = 4 * DefaultSorter.this.kvend;
                        int distkvi = DefaultSorter.this.distanceTo(DefaultSorter.this.bufindex, kvbidx);
                        int distkve = DefaultSorter.this.distanceTo(DefaultSorter.this.bufindex, kvbend);
                        boolean bl = distkvi <= distkve ? distkvi <= len + 32 : (blockwrite = distkve <= len || DefaultSorter.this.distanceTo(DefaultSorter.this.bufend, kvbidx) < 32);
                        if (!DefaultSorter.this.spillInProgress && blockwrite) {
                            if ((kvbend + 16) % DefaultSorter.this.kvbuffer.length != DefaultSorter.this.equator - DefaultSorter.this.equator % 16) {
                                DefaultSorter.this.resetSpill();
                                DefaultSorter.this.bufferRemaining = Math.min(distkvi - 32, DefaultSorter.this.softLimit - DefaultSorter.this.distanceTo(kvbidx, DefaultSorter.this.bufindex)) - len;
                                continue;
                            }
                            if (DefaultSorter.this.kvindex != DefaultSorter.this.kvend) {
                                DefaultSorter.this.startSpill();
                                DefaultSorter.this.setEquator(DefaultSorter.this.bufmark);
                            } else {
                                int size = DefaultSorter.this.distanceTo(DefaultSorter.this.bufstart, DefaultSorter.this.bufindex) + len;
                                DefaultSorter.this.setEquator(0);
                                DefaultSorter.this.bufend = DefaultSorter.this.bufindex = DefaultSorter.this.equator;
                                DefaultSorter.this.bufstart = DefaultSorter.this.bufindex;
                                DefaultSorter.this.kvstart = DefaultSorter.this.kvend = DefaultSorter.this.kvindex;
                                DefaultSorter.this.bufvoid = DefaultSorter.this.kvbuffer.length;
                                throw new ExternalSorter.MapBufferTooSmallException(size + " bytes");
                            }
                        }
                        if (!blockwrite) continue;
                        try {
                            while (DefaultSorter.this.spillInProgress) {
                                DefaultSorter.this.spillDone.await();
                            }
                        }
                        catch (InterruptedException e) {
                            throw new IOException("Buffer interrupted while waiting for the writer", e);
                        }
                    } while (blockwrite);
                }
                finally {
                    DefaultSorter.this.spillLock.unlock();
                }
            }
            if ((futureBufIndex = (long)DefaultSorter.this.bufindex + (long)len) > (long)DefaultSorter.this.bufvoid) {
                int gaplen = DefaultSorter.this.bufvoid - DefaultSorter.this.bufindex;
                System.arraycopy(b, off, DefaultSorter.this.kvbuffer, DefaultSorter.this.bufindex, gaplen);
                len -= gaplen;
                off += gaplen;
                DefaultSorter.this.bufindex = 0;
            }
            System.arraycopy(b, off, DefaultSorter.this.kvbuffer, DefaultSorter.this.bufindex, len);
            DefaultSorter.this.bufindex += len;
        }
    }

    protected class BlockingBuffer
    extends DataOutputStream {
        public BlockingBuffer() {
            super(new Buffer());
        }

        public int markRecord() {
            DefaultSorter.this.bufmark = DefaultSorter.this.bufindex;
            return DefaultSorter.this.bufindex;
        }

        protected void shiftBufferedKey() throws IOException {
            int headbytelen = DefaultSorter.this.bufvoid - DefaultSorter.this.bufmark;
            DefaultSorter.this.bufvoid = DefaultSorter.this.bufmark;
            int kvbidx = 4 * DefaultSorter.this.kvindex;
            int kvbend = 4 * DefaultSorter.this.kvend;
            int avail = Math.min(DefaultSorter.this.distanceTo(0, kvbidx), DefaultSorter.this.distanceTo(0, kvbend));
            if (DefaultSorter.this.bufindex + headbytelen < avail) {
                System.arraycopy(DefaultSorter.this.kvbuffer, 0, DefaultSorter.this.kvbuffer, headbytelen, DefaultSorter.this.bufindex);
                System.arraycopy(DefaultSorter.this.kvbuffer, DefaultSorter.this.bufvoid, DefaultSorter.this.kvbuffer, 0, headbytelen);
                DefaultSorter.this.bufindex += headbytelen;
                DefaultSorter.this.bufferRemaining -= DefaultSorter.this.kvbuffer.length - DefaultSorter.this.bufvoid;
            } else {
                byte[] keytmp = new byte[DefaultSorter.this.bufindex];
                System.arraycopy(DefaultSorter.this.kvbuffer, 0, keytmp, 0, DefaultSorter.this.bufindex);
                DefaultSorter.this.bufindex = 0;
                this.out.write(DefaultSorter.this.kvbuffer, DefaultSorter.this.bufmark, headbytelen);
                this.out.write(keytmp);
            }
        }
    }
}

