package org.apache.spark.shuffle.sort;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.commons.io.output.CloseShieldOutputStream;
import org.apache.commons.io.output.CountingOutputStream;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.annotation.Private;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.internal.config.package$;
import org.apache.spark.io.CompressionCodec;
import org.apache.spark.io.CompressionCodec$;
import org.apache.spark.io.NioBufferedFileInputStream;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.annotations.VisibleForTesting;
import org.spark_project.guava.io.ByteStreams;
import org.spark_project.guava.io.Closeables;
import org.spark_project.guava.io.Files;
import scala.Option;
import scala.Product2;
import scala.collection.JavaConverters;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;

@Private
/* loaded from: input_file:org/apache/spark/shuffle/sort/UnsafeShuffleWriter.class */
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
    private static final Logger logger;
    private static final ClassTag<Object> OBJECT_CLASS_TAG;

    @VisibleForTesting
    static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096;
    static final int DEFAULT_INITIAL_SER_BUFFER_SIZE = 1048576;
    private final BlockManager blockManager;
    private final IndexShuffleBlockResolver shuffleBlockResolver;
    private final TaskMemoryManager memoryManager;
    private final SerializerInstance serializer;
    private final Partitioner partitioner;
    private final ShuffleWriteMetrics writeMetrics;
    private final int shuffleId;
    private final int mapId;
    private final TaskContext taskContext;
    private final SparkConf sparkConf;
    private final boolean transferToEnabled;
    private final int initialSortBufferSize;
    private final int inputBufferSizeInBytes;
    private final int outputBufferSizeInBytes;

    @Nullable
    private MapStatus mapStatus;

    @Nullable
    private ShuffleExternalSorter sorter;
    private MyByteArrayOutputStream serBuffer;
    private SerializationStream serOutputStream;
    static final /* synthetic */ boolean $assertionsDisabled;
    private long peakMemoryUsedBytes = 0;
    private boolean stopping = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/shuffle/sort/UnsafeShuffleWriter$CloseAndFlushShieldOutputStream.class */
    public class CloseAndFlushShieldOutputStream extends CloseShieldOutputStream {
        CloseAndFlushShieldOutputStream(OutputStream outputStream) {
            super(outputStream);
        }

        public void flush() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/shuffle/sort/UnsafeShuffleWriter$MyByteArrayOutputStream.class */
    public static final class MyByteArrayOutputStream extends ByteArrayOutputStream {
        MyByteArrayOutputStream(int i) {
            super(i);
        }

        public byte[] getBuf() {
            return this.buf;
        }
    }

    public UnsafeShuffleWriter(BlockManager blockManager, IndexShuffleBlockResolver indexShuffleBlockResolver, TaskMemoryManager taskMemoryManager, SerializedShuffleHandle<K, V> serializedShuffleHandle, int i, TaskContext taskContext, SparkConf sparkConf) throws IOException {
        if (serializedShuffleHandle.dependency().partitioner().numPartitions() > SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE()) {
            throw new IllegalArgumentException("UnsafeShuffleWriter can only be used for shuffles with at most " + SortShuffleManager.MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE() + " reduce partitions");
        }
        this.blockManager = blockManager;
        this.shuffleBlockResolver = indexShuffleBlockResolver;
        this.memoryManager = taskMemoryManager;
        this.mapId = i;
        ShuffleDependency<K, V, V> dependency = serializedShuffleHandle.dependency();
        this.shuffleId = dependency.shuffleId();
        this.serializer = dependency.serializer().newInstance();
        this.partitioner = dependency.partitioner();
        this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
        this.taskContext = taskContext;
        this.sparkConf = sparkConf;
        this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
        this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize", 4096);
        this.inputBufferSizeInBytes = ((int) ((Long) sparkConf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE())).longValue()) * 1024;
        this.outputBufferSizeInBytes = ((int) ((Long) sparkConf.get(package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE())).longValue()) * 1024;
        open();
    }

    private void updatePeakMemoryUsed() {
        if (this.sorter != null) {
            long peakMemoryUsedBytes = this.sorter.getPeakMemoryUsedBytes();
            if (peakMemoryUsedBytes > this.peakMemoryUsedBytes) {
                this.peakMemoryUsedBytes = peakMemoryUsedBytes;
            }
        }
    }

    public long getPeakMemoryUsedBytes() {
        updatePeakMemoryUsed();
        return this.peakMemoryUsedBytes;
    }

    @VisibleForTesting
    public void write(Iterator<Product2<K, V>> it) throws IOException {
        write((scala.collection.Iterator) JavaConverters.asScalaIteratorConverter(it).asScala());
    }

    @Override // org.apache.spark.shuffle.ShuffleWriter
    public void write(scala.collection.Iterator<Product2<K, V>> iterator) throws IOException {
        boolean z = false;
        while (iterator.hasNext()) {
            try {
                insertRecordIntoSorter(iterator.mo706next());
            } catch (Throwable th) {
                if (this.sorter != null) {
                    try {
                        this.sorter.cleanupResources();
                    } catch (Exception e) {
                        if (z) {
                            throw e;
                        }
                        logger.error("In addition to a failure during writing, we failed during cleanup.", e);
                    }
                }
                throw th;
            }
        }
        closeAndWriteOutput();
        z = true;
        if (this.sorter != null) {
            try {
                this.sorter.cleanupResources();
            } catch (Exception e2) {
                if (1 != 0) {
                    throw e2;
                }
                logger.error("In addition to a failure during writing, we failed during cleanup.", e2);
            }
        }
    }

    private void open() {
        if (!$assertionsDisabled && this.sorter != null) {
            throw new AssertionError();
        }
        this.sorter = new ShuffleExternalSorter(this.memoryManager, this.blockManager, this.taskContext, this.initialSortBufferSize, this.partitioner.numPartitions(), this.sparkConf, this.writeMetrics);
        this.serBuffer = new MyByteArrayOutputStream(1048576);
        this.serOutputStream = this.serializer.serializeStream(this.serBuffer);
    }

    @VisibleForTesting
    void closeAndWriteOutput() throws IOException {
        if (!$assertionsDisabled && this.sorter == null) {
            throw new AssertionError();
        }
        updatePeakMemoryUsed();
        this.serBuffer = null;
        this.serOutputStream = null;
        SpillInfo[] closeAndGetSpills = this.sorter.closeAndGetSpills();
        this.sorter = null;
        File tempFileWith = Utils.tempFileWith(this.shuffleBlockResolver.getDataFile(this.shuffleId, this.mapId));
        try {
            try {
                long[] mergeSpills = mergeSpills(closeAndGetSpills, tempFileWith);
                for (SpillInfo spillInfo : closeAndGetSpills) {
                    if (spillInfo.file.exists() && !spillInfo.file.delete()) {
                        logger.error("Error while deleting spill file {}", spillInfo.file.getPath());
                    }
                }
                this.shuffleBlockResolver.writeIndexFileAndCommit(this.shuffleId, this.mapId, mergeSpills, tempFileWith);
                if (tempFileWith.exists() && !tempFileWith.delete()) {
                    logger.error("Error while deleting temp file {}", tempFileWith.getAbsolutePath());
                }
                this.mapStatus = MapStatus$.MODULE$.apply(this.blockManager.shuffleServerId(), mergeSpills);
            } catch (Throwable th) {
                for (SpillInfo spillInfo2 : closeAndGetSpills) {
                    if (spillInfo2.file.exists() && !spillInfo2.file.delete()) {
                        logger.error("Error while deleting spill file {}", spillInfo2.file.getPath());
                    }
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (tempFileWith.exists() && !tempFileWith.delete()) {
                logger.error("Error while deleting temp file {}", tempFileWith.getAbsolutePath());
            }
            throw th2;
        }
    }

    @VisibleForTesting
    void insertRecordIntoSorter(Product2<K, V> product2) throws IOException {
        if (!$assertionsDisabled && this.sorter == null) {
            throw new AssertionError();
        }
        K mo12499_1 = product2.mo12499_1();
        int partition = this.partitioner.getPartition(mo12499_1);
        this.serBuffer.reset();
        this.serOutputStream.writeKey(mo12499_1, OBJECT_CLASS_TAG);
        this.serOutputStream.writeValue(product2.mo12498_2(), OBJECT_CLASS_TAG);
        this.serOutputStream.flush();
        int size = this.serBuffer.size();
        if (!$assertionsDisabled && size <= 0) {
            throw new AssertionError();
        }
        this.sorter.insertRecord(this.serBuffer.getBuf(), Platform.BYTE_ARRAY_OFFSET, size, partition);
    }

    @VisibleForTesting
    void forceSorterToSpill() throws IOException {
        if (!$assertionsDisabled && this.sorter == null) {
            throw new AssertionError();
        }
        this.sorter.spill();
    }

    private long[] mergeSpills(SpillInfo[] spillInfoArr, File file) throws IOException {
        long[] mergeSpillsWithFileStream;
        boolean z = this.sparkConf.getBoolean("spark.shuffle.compress", true);
        CompressionCodec createCodec = CompressionCodec$.MODULE$.createCodec(this.sparkConf);
        boolean z2 = this.sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);
        boolean z3 = !z || CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(createCodec);
        boolean encryptionEnabled = this.blockManager.serializerManager().encryptionEnabled();
        try {
            if (spillInfoArr.length == 0) {
                new FileOutputStream(file).close();
                return new long[this.partitioner.numPartitions()];
            }
            if (spillInfoArr.length == 1) {
                Files.move(spillInfoArr[0].file, file);
                return spillInfoArr[0].partitionLengths;
            }
            if (!z2 || !z3) {
                logger.debug("Using slow merge");
                mergeSpillsWithFileStream = mergeSpillsWithFileStream(spillInfoArr, file, createCodec);
            } else if (!this.transferToEnabled || encryptionEnabled) {
                logger.debug("Using fileStream-based fast merge");
                mergeSpillsWithFileStream = mergeSpillsWithFileStream(spillInfoArr, file, null);
            } else {
                logger.debug("Using transferTo-based fast merge");
                mergeSpillsWithFileStream = mergeSpillsWithTransferTo(spillInfoArr, file);
            }
            this.writeMetrics.decBytesWritten(spillInfoArr[spillInfoArr.length - 1].file.length());
            this.writeMetrics.incBytesWritten(file.length());
            return mergeSpillsWithFileStream;
        } catch (IOException e) {
            if (file.exists() && !file.delete()) {
                logger.error("Unable to delete output file {}", file.getPath());
            }
            throw e;
        }
    }

    /* JADX WARN: Finally extract failed */
    private long[] mergeSpillsWithFileStream(SpillInfo[] spillInfoArr, File file, @Nullable CompressionCodec compressionCodec) throws IOException {
        if (!$assertionsDisabled && spillInfoArr.length < 2) {
            throw new AssertionError();
        }
        int numPartitions = this.partitioner.numPartitions();
        long[] jArr = new long[numPartitions];
        InputStream[] inputStreamArr = new InputStream[spillInfoArr.length];
        CountingOutputStream countingOutputStream = new CountingOutputStream(new BufferedOutputStream(new FileOutputStream(file), this.outputBufferSizeInBytes));
        for (int i = 0; i < spillInfoArr.length; i++) {
            try {
                inputStreamArr[i] = new NioBufferedFileInputStream(spillInfoArr[i].file, this.inputBufferSizeInBytes);
            } catch (Throwable th) {
                for (InputStream inputStream : inputStreamArr) {
                    Closeables.close(inputStream, true);
                }
                Closeables.close(countingOutputStream, true);
                throw th;
            }
        }
        for (int i2 = 0; i2 < numPartitions; i2++) {
            long byteCount = countingOutputStream.getByteCount();
            OutputStream wrapForEncryption = this.blockManager.serializerManager().wrapForEncryption((OutputStream) new CloseAndFlushShieldOutputStream(new TimeTrackingOutputStream(this.writeMetrics, countingOutputStream)));
            if (compressionCodec != null) {
                wrapForEncryption = compressionCodec.compressedOutputStream(wrapForEncryption);
            }
            for (int i3 = 0; i3 < spillInfoArr.length; i3++) {
                long j = spillInfoArr[i3].partitionLengths[i2];
                if (j > 0) {
                    InputStream limitedInputStream = new LimitedInputStream(inputStreamArr[i3], j, false);
                    try {
                        limitedInputStream = this.blockManager.serializerManager().wrapForEncryption(limitedInputStream);
                        if (compressionCodec != null) {
                            limitedInputStream = compressionCodec.compressedInputStream(limitedInputStream);
                        }
                        ByteStreams.copy(limitedInputStream, wrapForEncryption);
                        limitedInputStream.close();
                    } catch (Throwable th2) {
                        limitedInputStream.close();
                        throw th2;
                    }
                }
            }
            wrapForEncryption.flush();
            wrapForEncryption.close();
            jArr[i2] = countingOutputStream.getByteCount() - byteCount;
        }
        for (InputStream inputStream2 : inputStreamArr) {
            Closeables.close(inputStream2, false);
        }
        Closeables.close(countingOutputStream, false);
        return jArr;
    }

    private long[] mergeSpillsWithTransferTo(SpillInfo[] spillInfoArr, File file) throws IOException {
        if (!$assertionsDisabled && spillInfoArr.length < 2) {
            throw new AssertionError();
        }
        int numPartitions = this.partitioner.numPartitions();
        long[] jArr = new long[numPartitions];
        FileChannel[] fileChannelArr = new FileChannel[spillInfoArr.length];
        long[] jArr2 = new long[spillInfoArr.length];
        FileChannel fileChannel = null;
        for (int i = 0; i < spillInfoArr.length; i++) {
            try {
                fileChannelArr[i] = new FileInputStream(spillInfoArr[i].file).getChannel();
            } catch (Throwable th) {
                for (int i2 = 0; i2 < spillInfoArr.length; i2++) {
                    if (!$assertionsDisabled && jArr2[i2] != spillInfoArr[i2].file.length()) {
                        throw new AssertionError();
                    }
                    Closeables.close(fileChannelArr[i2], true);
                }
                Closeables.close(fileChannel, true);
                throw th;
            }
        }
        fileChannel = new FileOutputStream(file, true).getChannel();
        long j = 0;
        for (int i3 = 0; i3 < numPartitions; i3++) {
            for (int i4 = 0; i4 < spillInfoArr.length; i4++) {
                long j2 = spillInfoArr[i4].partitionLengths[i3];
                FileChannel fileChannel2 = fileChannelArr[i4];
                long nanoTime = System.nanoTime();
                Utils.copyFileStreamNIO(fileChannel2, fileChannel, jArr2[i4], j2);
                int i5 = i4;
                jArr2[i5] = jArr2[i5] + j2;
                this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
                j += j2;
                int i6 = i3;
                jArr[i6] = jArr[i6] + j2;
            }
        }
        if (fileChannel.position() != j) {
            throw new IOException("Current position " + fileChannel.position() + " does not equal expected position " + j + " after transferTo. Please check your kernel version to see if it is 2.6.32, as there is a kernel bug which will lead to unexpected behavior when using transferTo. You can set spark.file.transferTo=false to disable this NIO feature.");
        }
        for (int i7 = 0; i7 < spillInfoArr.length; i7++) {
            if (!$assertionsDisabled && jArr2[i7] != spillInfoArr[i7].file.length()) {
                throw new AssertionError();
            }
            Closeables.close(fileChannelArr[i7], false);
        }
        Closeables.close(fileChannel, false);
        return jArr;
    }

    @Override // org.apache.spark.shuffle.ShuffleWriter
    public Option<MapStatus> stop(boolean z) {
        try {
            this.taskContext.taskMetrics().incPeakExecutionMemory(getPeakMemoryUsedBytes());
            if (this.stopping) {
                return Option.apply(null);
            }
            this.stopping = true;
            if (!z) {
                Option<MapStatus> apply = Option.apply(null);
                if (this.sorter != null) {
                    this.sorter.cleanupResources();
                }
                return apply;
            }
            if (this.mapStatus == null) {
                throw new IllegalStateException("Cannot call stop(true) without having called write()");
            }
            Option<MapStatus> apply2 = Option.apply(this.mapStatus);
            if (this.sorter != null) {
                this.sorter.cleanupResources();
            }
            return apply2;
        } finally {
            if (this.sorter != null) {
                this.sorter.cleanupResources();
            }
        }
    }

    static {
        $assertionsDisabled = !UnsafeShuffleWriter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(UnsafeShuffleWriter.class);
        OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();
    }
}
