package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.FileChunk;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.ConfigUtils;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezMerger;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.hadoop.compat.NullProgressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.class */
public class MergeManager {
    private static final Logger LOG = LoggerFactory.getLogger(MergeManager.class);
    private final Configuration conf;
    private final FileSystem localFS;
    private final FileSystem rfs;
    private final LocalDirAllocator localDirAllocator;
    private final TezTaskOutputFiles mapOutputFile;
    private final Combiner combiner;
    private final IntermediateMemoryToMemoryMerger memToMemMerger;
    private final InMemoryMerger inMemoryMerger;

    @VisibleForTesting
    final OnDiskMerger onDiskMerger;
    private final long memoryLimit;

    @VisibleForTesting
    final long postMergeMemLimit;
    private long usedMemory;
    private long commitMemory;
    private final int ioSortFactor;
    private final long maxSingleShuffleLimit;
    private final int memToMemMergeOutputsThreshold;
    private final long mergeThreshold;
    private final long initialMemoryAvailable;
    private final ExceptionReporter exceptionReporter;
    private final InputContext inputContext;
    private final TezCounter spilledRecordsCounter;
    private final TezCounter reduceCombineInputCounter;
    private final TezCounter mergedMapOutputsCounter;
    private final TezCounter numMemToDiskMerges;
    private final TezCounter numDiskToDiskMerges;
    private final TezCounter additionalBytesWritten;
    private final TezCounter additionalBytesRead;
    private final CompressionCodec codec;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private final int ifileBufferSize;
    private final Progressable nullProgressable = new NullProgressable();
    private final Set<MapOutput> inMemoryMergedMapOutputs = new TreeSet(new MapOutput.MapOutputComparator());
    private final Set<MapOutput> inMemoryMapOutputs = new TreeSet(new MapOutput.MapOutputComparator());

    @VisibleForTesting
    final Set<FileChunk> onDiskMapOutputs = new TreeSet();
    private volatile boolean finalMergeComplete = false;
    private AtomicInteger mergeFileSequenceId = new AtomicInteger(0);
    private final MapOutput stallShuffle = MapOutput.createWaitMapOutput(null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager$InMemoryMerger.class */
    public class InMemoryMerger extends MergeThread<MapOutput> {
        public InMemoryMerger(MergeManager mergeManager) {
            super(mergeManager, Integer.MAX_VALUE, MergeManager.this.exceptionReporter);
            setName("MemtoDiskMerger [" + TezUtilsInternal.cleanVertexName(MergeManager.this.inputContext.getSourceVertexName()) + "]");
            setDaemon(true);
        }

        @Override // org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeThread
        public void merge(List<MapOutput> list) throws IOException, InterruptedException {
            if (list == null || list.size() == 0) {
                return;
            }
            MergeManager.this.numMemToDiskMerges.increment(1L);
            InputAttemptIdentifier attemptIdentifier = list.get(0).getAttemptIdentifier();
            ArrayList arrayList = new ArrayList();
            long createInMemorySegments = MergeManager.this.createInMemorySegments(list, arrayList, 0L);
            int size = arrayList.size();
            Path suffix = MergeManager.this.mapOutputFile.getInputFileForWrite(attemptIdentifier.getInputIdentifier().getInputIndex(), attemptIdentifier.getSpillEventId(), createInMemorySegments).suffix(Constants.MERGED_OUTPUT_PREFIX);
            IFile.Writer writer = null;
            try {
                try {
                    IFile.Writer writer2 = new IFile.Writer(MergeManager.this.conf, MergeManager.this.rfs, suffix, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), MergeManager.this.codec, (TezCounter) null, (TezCounter) null);
                    MergeManager.LOG.info("Initiating in-memory merge with " + size + " segments...");
                    TezRawKeyValueIterator merge = TezMerger.merge(MergeManager.this.conf, MergeManager.this.rfs, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), arrayList, arrayList.size(), new Path(MergeManager.this.inputContext.getUniqueIdentifier()), ConfigUtils.getIntermediateInputKeyComparator(MergeManager.this.conf), MergeManager.this.nullProgressable, MergeManager.this.spilledRecordsCounter, null, MergeManager.this.additionalBytesRead, null);
                    if (null == MergeManager.this.combiner) {
                        TezMerger.writeFile(merge, writer2, MergeManager.this.nullProgressable, 10000L);
                    } else {
                        MergeManager.this.runCombineProcessor(merge, writer2);
                    }
                    writer2.close();
                    MergeManager.this.additionalBytesWritten.increment(writer2.getCompressedLength());
                    writer = null;
                    long len = MergeManager.this.localFS.getFileStatus(suffix).getLen();
                    MergeManager.LOG.info(MergeManager.this.inputContext.getUniqueIdentifier() + " Merge of the " + size + " files in-memory complete. Local file is " + suffix + " of size " + len);
                    if (0 != 0) {
                        writer.close();
                    }
                    MergeManager.this.closeOnDiskFile(new FileChunk(suffix, 0L, len));
                } catch (IOException e) {
                    MergeManager.this.localFS.delete(suffix, true);
                    throw e;
                }
            } catch (Throwable th) {
                if (writer != null) {
                    writer.close();
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager$IntermediateMemoryToMemoryMerger.class */
    public class IntermediateMemoryToMemoryMerger extends MergeThread<MapOutput> {
        public IntermediateMemoryToMemoryMerger(MergeManager mergeManager, int i) {
            super(mergeManager, i, MergeManager.this.exceptionReporter);
            setName("MemToMemMerger [" + TezUtilsInternal.cleanVertexName(MergeManager.this.inputContext.getSourceVertexName()) + "]");
            setDaemon(true);
        }

        @Override // org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeThread
        public void merge(List<MapOutput> list) throws IOException {
            if (list == null || list.size() == 0) {
                return;
            }
            InputAttemptIdentifier attemptIdentifier = list.get(0).getAttemptIdentifier();
            ArrayList arrayList = new ArrayList();
            long createInMemorySegments = MergeManager.this.createInMemorySegments(list, arrayList, 0L);
            int size = arrayList.size();
            MapOutput unconditionalReserve = MergeManager.this.unconditionalReserve(attemptIdentifier, createInMemorySegments, false);
            InMemoryWriter inMemoryWriter = new InMemoryWriter(unconditionalReserve.getArrayStream());
            MergeManager.LOG.info(MergeManager.this.inputContext.getSourceVertexName() + ": Initiating Memory-to-Memory merge with " + size + " segments of total-size: " + createInMemorySegments);
            TezMerger.writeFile(TezMerger.merge(MergeManager.this.conf, MergeManager.this.rfs, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), arrayList, arrayList.size(), new Path(MergeManager.this.inputContext.getUniqueIdentifier()), ConfigUtils.getIntermediateInputKeyComparator(MergeManager.this.conf), MergeManager.this.nullProgressable, null, null, null, null), inMemoryWriter, MergeManager.this.nullProgressable, 10000L);
            inMemoryWriter.close();
            MergeManager.LOG.info(MergeManager.this.inputContext.getSourceVertexName() + " Memory-to-Memory merge of the " + size + " files in-memory complete.");
            MergeManager.this.closeInMemoryMergedFile(unconditionalReserve);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager$OnDiskMerger.class */
    public class OnDiskMerger extends MergeThread<FileChunk> {
        public OnDiskMerger(MergeManager mergeManager) {
            super(mergeManager, MergeManager.this.ioSortFactor, MergeManager.this.exceptionReporter);
            setName("DiskToDiskMerger [" + TezUtilsInternal.cleanVertexName(MergeManager.this.inputContext.getSourceVertexName()) + "]");
            setDaemon(true);
        }

        @Override // org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeThread
        public void merge(List<FileChunk> list) throws IOException {
            if (list == null || list.isEmpty()) {
                MergeManager.LOG.info("No ondisk files to merge...");
                return;
            }
            MergeManager.this.numDiskToDiskMerges.increment(1L);
            long j = 0;
            int i = MergeManager.this.conf.getInt("io.bytes.per.checksum", 512);
            MergeManager.LOG.info("OnDiskMerger: We have  " + list.size() + " map outputs on disk. Triggering merge...");
            ArrayList arrayList = new ArrayList(list.size());
            for (FileChunk fileChunk : list) {
                long offset = fileChunk.getOffset();
                long length = fileChunk.getLength();
                boolean isLocalFile = fileChunk.isLocalFile();
                if (MergeManager.LOG.isDebugEnabled()) {
                    MergeManager.LOG.debug("InputAttemptIdentifier=" + fileChunk.getInputAttemptIdentifier() + ", len=" + fileChunk.getLength() + ", offset=" + fileChunk.getOffset() + ", path=" + fileChunk.getPath());
                }
                j += length;
                arrayList.add(new TezMerger.Segment(MergeManager.this.rfs, fileChunk.getPath(), offset, length, MergeManager.this.codec, MergeManager.this.ifileReadAhead, MergeManager.this.ifileReadAheadLength, MergeManager.this.ifileBufferSize, isLocalFile));
            }
            long checksumLength = j + ChecksumFileSystem.getChecksumLength(j, i);
            FileChunk fileChunk2 = list.get(0);
            Path suffix = MergeManager.this.localDirAllocator.getLocalPathForWrite(FilenameUtils.removeExtension(fileChunk2.isLocalFile() ? MergeManager.this.mapOutputFile.getSpillFileName(fileChunk2.getInputAttemptIdentifier().getInputIdentifier().getInputIndex(), fileChunk2.getInputAttemptIdentifier().getSpillEventId()) : fileChunk2.getPath().getName().toString()), checksumLength, MergeManager.this.conf).suffix(Constants.MERGED_OUTPUT_PREFIX + MergeManager.this.mergeFileSequenceId.getAndIncrement());
            IFile.Writer writer = new IFile.Writer(MergeManager.this.conf, MergeManager.this.rfs, suffix, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), MergeManager.this.codec, (TezCounter) null, (TezCounter) null);
            try {
                TezMerger.writeFile(TezMerger.merge(MergeManager.this.conf, MergeManager.this.rfs, ConfigUtils.getIntermediateInputKeyClass(MergeManager.this.conf), ConfigUtils.getIntermediateInputValueClass(MergeManager.this.conf), arrayList, MergeManager.this.ioSortFactor, new Path(MergeManager.this.inputContext.getUniqueIdentifier()), ConfigUtils.getIntermediateInputKeyComparator(MergeManager.this.conf), MergeManager.this.nullProgressable, true, MergeManager.this.spilledRecordsCounter, null, MergeManager.this.mergedMapOutputsCounter, null), writer, MergeManager.this.nullProgressable, 10000L);
                writer.close();
                MergeManager.this.additionalBytesWritten.increment(writer.getCompressedLength());
                long len = MergeManager.this.localFS.getFileStatus(suffix).getLen();
                MergeManager.this.closeOnDiskFile(new FileChunk(suffix, 0L, len));
                MergeManager.LOG.info(MergeManager.this.inputContext.getSourceVertexName() + " Finished merging " + list.size() + " map output files on disk of total-size " + checksumLength + ". Local output file is " + suffix + " of size " + len);
            } catch (IOException e) {
                MergeManager.this.localFS.delete(suffix, true);
                throw e;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager$RawKVIteratorReader.class */
    public class RawKVIteratorReader extends IFile.Reader {
        private final TezRawKeyValueIterator kvIter;

        public RawKVIteratorReader(TezRawKeyValueIterator tezRawKeyValueIterator, long j) throws IOException {
            super((InputStream) null, j, (CompressionCodec) null, MergeManager.this.spilledRecordsCounter, (TezCounter) null, MergeManager.this.ifileReadAhead, MergeManager.this.ifileReadAheadLength, MergeManager.this.ifileBufferSize);
            this.kvIter = tezRawKeyValueIterator;
        }

        @Override // org.apache.tez.runtime.library.common.sort.impl.IFile.Reader
        public IFile.Reader.KeyState readRawKey(DataInputBuffer dataInputBuffer) throws IOException {
            if (!this.kvIter.next()) {
                return IFile.Reader.KeyState.NO_KEY;
            }
            DataInputBuffer key = this.kvIter.getKey();
            int position = key.getPosition();
            int length = key.getLength() - position;
            dataInputBuffer.reset(key.getData(), position, length);
            this.bytesRead += length;
            return IFile.Reader.KeyState.NEW_KEY;
        }

        @Override // org.apache.tez.runtime.library.common.sort.impl.IFile.Reader
        public void nextRawValue(DataInputBuffer dataInputBuffer) throws IOException {
            DataInputBuffer value = this.kvIter.getValue();
            int position = value.getPosition();
            int length = value.getLength() - position;
            dataInputBuffer.reset(value.getData(), position, length);
            this.bytesRead += length;
        }

        @Override // org.apache.tez.runtime.library.common.sort.impl.IFile.Reader
        public long getPosition() throws IOException {
            return this.bytesRead;
        }

        @Override // org.apache.tez.runtime.library.common.sort.impl.IFile.Reader
        public void close() throws IOException {
            this.kvIter.close();
        }
    }

    public MergeManager(Configuration configuration, FileSystem fileSystem, LocalDirAllocator localDirAllocator, InputContext inputContext, Combiner combiner, TezCounter tezCounter, TezCounter tezCounter2, TezCounter tezCounter3, ExceptionReporter exceptionReporter, long j, CompressionCodec compressionCodec, boolean z, int i) {
        this.inputContext = inputContext;
        this.conf = configuration;
        this.localDirAllocator = localDirAllocator;
        this.exceptionReporter = exceptionReporter;
        this.initialMemoryAvailable = j;
        this.combiner = combiner;
        this.reduceCombineInputCounter = tezCounter2;
        this.spilledRecordsCounter = tezCounter;
        this.mergedMapOutputsCounter = tezCounter3;
        this.mapOutputFile = new TezTaskOutputFiles(configuration, inputContext.getUniqueIdentifier());
        this.localFS = fileSystem;
        this.rfs = ((LocalFileSystem) fileSystem).getRaw();
        this.numDiskToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_DISK_TO_DISK_MERGES);
        this.numMemToDiskMerges = inputContext.getCounters().findCounter(TaskCounter.NUM_MEM_TO_DISK_MERGES);
        this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        this.codec = compressionCodec;
        this.ifileReadAhead = z;
        if (this.ifileReadAhead) {
            this.ifileReadAheadLength = i;
        } else {
            this.ifileReadAheadLength = 0;
        }
        this.ifileBufferSize = configuration.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE, -1);
        float f = configuration.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.9f);
        if (f > 1.0d || f < 0.0d) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.fetch.buffer.percent: " + f);
        }
        long j2 = configuration.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY, ((float) inputContext.getTotalMemoryAvailableToTask()) * f);
        float f2 = configuration.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
        if (f2 > 1.0d || f2 < 0.0d) {
            throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT + f2);
        }
        long totalMemoryAvailableToTask = ((float) inputContext.getTotalMemoryAvailableToTask()) * f2;
        if (this.initialMemoryAvailable < j2) {
            this.memoryLimit = this.initialMemoryAvailable;
        } else {
            this.memoryLimit = j2;
        }
        if (this.initialMemoryAvailable < totalMemoryAvailableToTask) {
            this.postMergeMemLimit = this.initialMemoryAvailable;
        } else {
            this.postMergeMemLimit = totalMemoryAvailableToTask;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(inputContext.getSourceVertexName() + ": InitialRequest: ShuffleMem=" + j2 + ", postMergeMem=" + totalMemoryAvailableToTask + ", RuntimeTotalAvailable=" + this.initialMemoryAvailable + ". Updated to: ShuffleMem=" + this.memoryLimit + ", postMergeMem: " + this.postMergeMemLimit);
        }
        this.ioSortFactor = configuration.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 100);
        float f3 = configuration.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT, 0.25f);
        if (f3 <= TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT || f3 > 1.0f) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.memory.limit.percent: " + f3);
        }
        this.maxSingleShuffleLimit = Math.min(((float) this.memoryLimit) * f3, 2.1474836E9f);
        this.memToMemMergeOutputsThreshold = configuration.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS, this.ioSortFactor);
        this.mergeThreshold = ((float) this.memoryLimit) * configuration.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT, 0.9f);
        LOG.info(inputContext.getSourceVertexName() + ": MergerManager: memoryLimit=" + this.memoryLimit + ", maxSingleShuffleLimit=" + this.maxSingleShuffleLimit + ", mergeThreshold=" + this.mergeThreshold + ", ioSortFactor=" + this.ioSortFactor + ", postMergeMem=" + this.postMergeMemLimit + ", memToMemMergeOutputsThreshold=" + this.memToMemMergeOutputsThreshold);
        if (this.maxSingleShuffleLimit >= this.mergeThreshold) {
            throw new RuntimeException("Invlaid configuration: maxSingleShuffleLimit should be less than mergeThresholdmaxSingleShuffleLimit: " + this.maxSingleShuffleLimit + ", mergeThreshold: " + this.mergeThreshold);
        }
        if (configuration.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM, false)) {
            this.memToMemMerger = new IntermediateMemoryToMemoryMerger(this, this.memToMemMergeOutputsThreshold);
        } else {
            this.memToMemMerger = null;
        }
        this.inMemoryMerger = new InMemoryMerger(this);
        this.onDiskMerger = new OnDiskMerger(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @InterfaceAudience.Private
    public void configureAndStart() {
        if (this.memToMemMerger != null) {
            this.memToMemMerger.start();
        }
        this.inMemoryMerger.start();
        this.onDiskMerger.start();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @InterfaceAudience.Private
    public static long getInitialMemoryRequirement(Configuration configuration, long j) {
        float f = configuration.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT, 0.9f);
        if (f > 1.0d || f < 0.0d) {
            throw new IllegalArgumentException("Invalid value for tez.runtime.shuffle.fetch.buffer.percent: " + f);
        }
        long j2 = configuration.getLong(Constants.TEZ_RUNTIME_TASK_MEMORY, ((float) j) * f);
        float f2 = configuration.getFloat(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT, TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT);
        if (f2 > 1.0d || f2 < 0.0d) {
            throw new TezUncheckedException(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT + f2);
        }
        long j3 = ((float) j) * f2;
        LOG.info("Initial Memory required for SHUFFLE_BUFFER=" + j2 + " based on INPUT_BUFFER_FACTOR=" + f + ",  for final merged output=" + j3 + ", using factor: " + f2);
        return Math.max(j3, j2);
    }

    public void waitForInMemoryMerge() throws InterruptedException {
        this.inMemoryMerger.waitForMerge();
        boolean z = false;
        synchronized (this) {
            if (this.commitMemory >= this.mergeThreshold) {
                startMemToDiskMerge();
                z = true;
            }
        }
        if (z) {
            this.inMemoryMerger.waitForMerge();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Additional in-memory merge triggered");
            }
        }
    }

    private boolean canShuffleToMemory(long j) {
        return j < this.maxSingleShuffleLimit;
    }

    public synchronized void waitForShuffleToMergeMemory() throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        while (this.usedMemory > this.memoryLimit) {
            wait();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Waited for " + (System.currentTimeMillis() - currentTimeMillis) + " for memory to become available");
        }
    }

    public synchronized MapOutput reserve(InputAttemptIdentifier inputAttemptIdentifier, long j, long j2, int i) throws IOException {
        if (!canShuffleToMemory(j)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(inputAttemptIdentifier + ": Shuffling to disk since " + j + " is greater than maxSingleShuffleLimit (" + this.maxSingleShuffleLimit + ")");
            }
            return MapOutput.createDiskMapOutput(inputAttemptIdentifier, this, j2, this.conf, i, true, this.mapOutputFile);
        }
        if (this.usedMemory > this.memoryLimit) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(inputAttemptIdentifier + ": Stalling shuffle since usedMemory (" + this.usedMemory + ") is greater than memoryLimit (" + this.memoryLimit + "). CommitMemory is (" + this.commitMemory + ")");
            }
            return this.stallShuffle;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug(inputAttemptIdentifier + ": Proceeding with shuffle since usedMemory (" + this.usedMemory + ") is lesser than memoryLimit (" + this.memoryLimit + ").CommitMemory is (" + this.commitMemory + ")");
        }
        return unconditionalReserve(inputAttemptIdentifier, j, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized MapOutput unconditionalReserve(InputAttemptIdentifier inputAttemptIdentifier, long j, boolean z) throws IOException {
        this.usedMemory += j;
        return MapOutput.createMemoryMapOutput(inputAttemptIdentifier, this, (int) j, z);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void unreserve(long j) {
        this.commitMemory -= j;
        this.usedMemory -= j;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Notifying unreserve : commitMemory=" + this.commitMemory + ", usedMemory=" + this.usedMemory + ", mergeThreshold=" + this.mergeThreshold);
        }
        notifyAll();
    }

    public synchronized void closeInMemoryFile(MapOutput mapOutput) {
        this.inMemoryMapOutputs.add(mapOutput);
        LOG.info("closeInMemoryFile -> map-output of size: " + mapOutput.getSize() + ", inMemoryMapOutputs.size() -> " + this.inMemoryMapOutputs.size() + ", commitMemory -> " + this.commitMemory + ", usedMemory ->" + this.usedMemory + ", mapOutput=" + mapOutput);
        this.commitMemory += mapOutput.getSize();
        if (this.commitMemory >= this.mergeThreshold) {
            startMemToDiskMerge();
        }
        if (this.memToMemMerger != null) {
            synchronized (this.memToMemMerger) {
                if (!this.memToMemMerger.isInProgress() && this.inMemoryMapOutputs.size() >= this.memToMemMergeOutputsThreshold) {
                    this.memToMemMerger.startMerge(this.inMemoryMapOutputs);
                }
            }
        }
    }

    private void startMemToDiskMerge() {
        synchronized (this.inMemoryMerger) {
            if (!this.inMemoryMerger.isInProgress()) {
                LOG.info(this.inputContext.getSourceVertexName() + ": Starting inMemoryMerger's merge since commitMemory=" + this.commitMemory + " > mergeThreshold=" + this.mergeThreshold + ". Current usedMemory=" + this.usedMemory);
                this.inMemoryMapOutputs.addAll(this.inMemoryMergedMapOutputs);
                this.inMemoryMergedMapOutputs.clear();
                this.inMemoryMerger.startMerge(this.inMemoryMapOutputs);
            }
        }
    }

    public synchronized void closeInMemoryMergedFile(MapOutput mapOutput) {
        this.inMemoryMergedMapOutputs.add(mapOutput);
        LOG.info("closeInMemoryMergedFile -> size: " + mapOutput.getSize() + ", inMemoryMergedMapOutputs.size() -> " + this.inMemoryMergedMapOutputs.size());
    }

    public synchronized void closeOnDiskFile(FileChunk fileChunk) {
        for (FileChunk fileChunk2 : this.onDiskMapOutputs) {
            if (fileChunk2.getPath().equals(fileChunk.getPath())) {
                Preconditions.checkArgument(fileChunk2.getOffset() != fileChunk.getOffset(), "Can't have a file with same path and offset.OldFilePath=" + fileChunk2.getPath() + ", OldFileOffset=" + fileChunk2.getOffset() + ", newFilePath=" + fileChunk.getPath() + ", newFileOffset=" + fileChunk.getOffset());
            }
        }
        this.onDiskMapOutputs.add(fileChunk);
        if (LOG.isDebugEnabled()) {
            LOG.debug("close onDiskFile=" + fileChunk.getPath() + ", len=" + fileChunk.getLength());
        }
        synchronized (this.onDiskMerger) {
            if (!this.onDiskMerger.isInProgress() && this.onDiskMapOutputs.size() >= (2 * this.ioSortFactor) - 1) {
                this.onDiskMerger.startMerge(this.onDiskMapOutputs);
            }
        }
    }

    @InterfaceAudience.Private
    public boolean isMergeComplete() {
        return this.finalMergeComplete;
    }

    public TezRawKeyValueIterator close() throws Throwable {
        if (this.memToMemMerger != null) {
            this.memToMemMerger.close();
        }
        this.inMemoryMerger.close();
        this.onDiskMerger.close();
        ArrayList arrayList = new ArrayList(this.inMemoryMergedMapOutputs);
        this.inMemoryMergedMapOutputs.clear();
        arrayList.addAll(this.inMemoryMapOutputs);
        this.inMemoryMapOutputs.clear();
        ArrayList arrayList2 = new ArrayList(this.onDiskMapOutputs);
        this.onDiskMapOutputs.clear();
        TezRawKeyValueIterator finalMerge = finalMerge(this.conf, this.rfs, arrayList, arrayList2);
        this.finalMergeComplete = true;
        return finalMerge;
    }

    void runCombineProcessor(TezRawKeyValueIterator tezRawKeyValueIterator, IFile.Writer writer) throws IOException, InterruptedException {
        this.combiner.combine(tezRawKeyValueIterator, writer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long createInMemorySegments(List<MapOutput> list, List<TezMerger.Segment> list2, long j) throws IOException {
        long j2 = 0;
        long j3 = 0;
        while (list.iterator().hasNext()) {
            j3 += r0.next().getMemory().length;
        }
        while (j3 > j) {
            MapOutput remove = list.remove(0);
            byte[] memory = remove.getMemory();
            long length = memory.length;
            j2 += length;
            j3 -= length;
            list2.add(new TezMerger.Segment(new InMemoryReader(this, remove.getAttemptIdentifier(), memory, 0, (int) length), true, remove.isPrimaryMapOutput() ? this.mergedMapOutputsCounter : null));
        }
        return j2;
    }

    /* JADX WARN: Finally extract failed */
    private TezRawKeyValueIterator finalMerge(Configuration configuration, FileSystem fileSystem, List<MapOutput> list, List<FileChunk> list2) throws IOException {
        LOG.info("finalMerge called with " + list.size() + " in-memory map-outputs and " + list2.size() + " on-disk map-outputs");
        if (LOG.isDebugEnabled()) {
            for (MapOutput mapOutput : list) {
                LOG.debug("inMemoryOutput=" + mapOutput + ", size=" + mapOutput.getSize());
            }
            for (FileChunk fileChunk : list2) {
                LOG.debug("onDiskMapOutput=" + fileChunk.getPath() + ", size=" + fileChunk.getLength());
            }
        }
        Class intermediateInputKeyClass = ConfigUtils.getIntermediateInputKeyClass(configuration);
        Class intermediateInputValueClass = ConfigUtils.getIntermediateInputValueClass(configuration);
        Path path = new Path(this.inputContext.getUniqueIdentifier());
        RawComparator intermediateInputKeyComparator = ConfigUtils.getIntermediateInputKeyComparator(configuration);
        ArrayList arrayList = new ArrayList();
        long j = 0;
        if (list.size() > 0) {
            int inputIndex = list.get(0).getAttemptIdentifier().getInputIdentifier().getInputIndex();
            j = createInMemorySegments(list, arrayList, this.postMergeMemLimit);
            int size = arrayList.size();
            if (size > 0 && this.ioSortFactor > list2.size()) {
                Path suffix = this.mapOutputFile.getInputFileForWrite(inputIndex, Integer.MAX_VALUE, j).suffix(Constants.MERGED_OUTPUT_PREFIX);
                TezRawKeyValueIterator merge = TezMerger.merge(configuration, fileSystem, intermediateInputKeyClass, intermediateInputValueClass, arrayList, size, path, intermediateInputKeyComparator, this.nullProgressable, this.spilledRecordsCounter, null, this.additionalBytesRead, null);
                IFile.Writer writer = new IFile.Writer(configuration, fileSystem, suffix, intermediateInputKeyClass, intermediateInputValueClass, this.codec, (TezCounter) null, (TezCounter) null);
                try {
                    try {
                        TezMerger.writeFile(merge, writer, this.nullProgressable, 10000L);
                        if (null != writer) {
                            writer.close();
                            this.additionalBytesWritten.increment(writer.getCompressedLength());
                        }
                        list2.add(new FileChunk(suffix, 0L, this.localFS.getFileStatus(suffix).getLen()));
                        LOG.info("Merged " + size + " segments, " + j + " bytes to disk to satisfy reduce memory limit. outputPath=" + suffix);
                        j = 0;
                        arrayList.clear();
                    } catch (IOException e) {
                        if (null != suffix) {
                            try {
                                fileSystem.delete(suffix, true);
                            } catch (IOException e2) {
                            }
                        }
                        throw e;
                    }
                } catch (Throwable th) {
                    if (null != writer) {
                        writer.close();
                        this.additionalBytesWritten.increment(writer.getCompressedLength());
                    }
                    throw th;
                }
            } else if (j != 0) {
                LOG.info("Keeping " + size + " segments, " + j + " bytes in memory for intermediate, on-disk merge");
            }
        }
        ArrayList arrayList2 = new ArrayList();
        long j2 = j;
        FileChunk[] fileChunkArr = (FileChunk[]) list2.toArray(new FileChunk[list2.size()]);
        for (FileChunk fileChunk2 : fileChunkArr) {
            long length = fileChunk2.getLength();
            j2 += length;
            LOG.info("Disk file=" + fileChunk2.getPath() + ", len=" + length + ", isLocal=" + fileChunk2.isLocalFile());
            Path path2 = fileChunk2.getPath();
            arrayList2.add(new TezMerger.Segment(fileSystem, path2, fileChunk2.getOffset(), length, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, this.ifileBufferSize, fileChunk2.isLocalFile(), path2.toString().endsWith(Constants.MERGED_OUTPUT_PREFIX) ? null : this.mergedMapOutputsCounter));
        }
        LOG.info("Merging " + fileChunkArr.length + " files, " + j2 + " bytes from disk");
        Collections.sort(arrayList2, new Comparator<TezMerger.Segment>() { // from class: org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager.1
            @Override // java.util.Comparator
            public int compare(TezMerger.Segment segment, TezMerger.Segment segment2) {
                if (segment.getLength() == segment2.getLength()) {
                    return 0;
                }
                return segment.getLength() < segment2.getLength() ? -1 : 1;
            }
        });
        ArrayList arrayList3 = new ArrayList();
        LOG.info("Merging " + arrayList3.size() + " segments, " + createInMemorySegments(list, arrayList3, 0L) + " bytes from memory into reduce");
        if (0 != j2) {
            int size2 = arrayList.size();
            arrayList2.addAll(0, arrayList);
            arrayList.clear();
            TezRawKeyValueIterator merge2 = TezMerger.merge(configuration, fileSystem, intermediateInputKeyClass, intermediateInputValueClass, this.codec, (List<TezMerger.Segment>) arrayList2, this.ioSortFactor, size2, path, intermediateInputKeyComparator, this.nullProgressable, false, this.spilledRecordsCounter, (TezCounter) null, this.additionalBytesRead, (Progress) null);
            arrayList2.clear();
            if (0 == arrayList3.size()) {
                return merge2;
            }
            arrayList3.add(new TezMerger.Segment(new RawKVIteratorReader(merge2, j2), true));
        }
        return TezMerger.merge(configuration, fileSystem, intermediateInputKeyClass, intermediateInputValueClass, arrayList3, arrayList3.size(), path, intermediateInputKeyComparator, this.nullProgressable, this.spilledRecordsCounter, null, this.additionalBytesRead, null);
    }
}
