package org.apache.spark.network.shuffle;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.shuffle.ErrorHandler;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.annotations.VisibleForTesting;
import org.sparkproject.guava.base.Objects;
import org.sparkproject.guava.base.Preconditions;
import org.sparkproject.guava.cache.CacheBuilder;
import org.sparkproject.guava.cache.CacheLoader;
import org.sparkproject.guava.cache.LoadingCache;
import org.sparkproject.guava.collect.Maps;
import org.sparkproject.guava.primitives.Ints;
import org.sparkproject.guava.primitives.Longs;

/* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver.class */
public class RemoteBlockPushResolver implements MergedShuffleFileManager {
    private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class);

    @VisibleForTesting
    static final String MERGE_MANAGER_DIR = "merge_manager";
    private final TransportConf conf;
    private final int minChunkSize;
    private final int ioExceptionsThresholdDuringMerge;
    private final LoadingCache<File, ShuffleIndexInformation> indexCache;
    private final ConcurrentMap<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> partitions = Maps.newConcurrentMap();
    private final ConcurrentMap<String, AppPathsInfo> appsPathInfo = Maps.newConcurrentMap();
    private final Executor directoryCleaner = Executors.newSingleThreadExecutor(NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
    private final ErrorHandler.BlockPushErrorHandler errorHandler = new ErrorHandler.BlockPushErrorHandler();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$AppPathsInfo.class */
    public static class AppPathsInfo {
        private final String[] activeLocalDirs;
        private final int subDirsPerLocalDir;

        private AppPathsInfo(String str, String[] strArr, int i) {
            this.activeLocalDirs = (String[]) Arrays.stream(strArr).map(str2 -> {
                return Paths.get(str2, new String[0]).getParent().resolve(RemoteBlockPushResolver.MERGE_MANAGER_DIR).toFile().getPath();
            }).toArray(i2 -> {
                return new String[i2];
            });
            this.subDirsPerLocalDir = i;
            if (RemoteBlockPushResolver.logger.isInfoEnabled()) {
                RemoteBlockPushResolver.logger.info("Updated active local dirs {} and sub dirs {} for application {}", new Object[]{Arrays.toString(this.activeLocalDirs), Integer.valueOf(i), str});
            }
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$AppShuffleId.class */
    public static class AppShuffleId {
        public final String appId;
        public final int shuffleId;

        AppShuffleId(String str, int i) {
            this.appId = str;
            this.shuffleId = i;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AppShuffleId appShuffleId = (AppShuffleId) obj;
            return this.shuffleId == appShuffleId.shuffleId && Objects.equal(this.appId, appShuffleId.appId);
        }

        public int hashCode() {
            return Objects.hashCode(this.appId, Integer.valueOf(this.shuffleId));
        }

        public String toString() {
            return Objects.toStringHelper(this).add(ApplicationCLI.APP_ID, this.appId).add("shuffleId", this.shuffleId).toString();
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$AppShufflePartitionInfo.class */
    public static class AppShufflePartitionInfo {
        private final AppShuffleId appShuffleId;
        private final int reduceId;
        public FileChannel dataChannel;
        private long dataFilePos;
        private RoaringBitmap mapTracker;
        private MergeShuffleFile indexFile;
        private MergeShuffleFile metaFile;
        private long lastChunkOffset;
        private RoaringBitmap chunkTracker;
        private boolean indexMetaUpdateFailed;
        private int lastMergedMapIndex = -1;
        private int numIOExceptions = 0;
        private int currentMapIndex = -1;

        AppShufflePartitionInfo(AppShuffleId appShuffleId, int i, File file, MergeShuffleFile mergeShuffleFile, MergeShuffleFile mergeShuffleFile2) throws IOException {
            this.appShuffleId = (AppShuffleId) Preconditions.checkNotNull(appShuffleId, "app shuffle id");
            this.reduceId = i;
            this.dataChannel = new FileOutputStream(file).getChannel();
            this.indexFile = mergeShuffleFile;
            this.metaFile = mergeShuffleFile2;
            updateChunkInfo(0L, -1);
            this.dataFilePos = 0L;
            this.mapTracker = new RoaringBitmap();
            this.chunkTracker = new RoaringBitmap();
        }

        public long getDataFilePos() {
            return this.dataFilePos;
        }

        public void setDataFilePos(long j) {
            RemoteBlockPushResolver.logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", new Object[]{this.appShuffleId.appId, Integer.valueOf(this.appShuffleId.shuffleId), Integer.valueOf(this.reduceId), Long.valueOf(this.dataFilePos), Long.valueOf(j)});
            this.dataFilePos = j;
        }

        int getCurrentMapIndex() {
            return this.currentMapIndex;
        }

        void setCurrentMapIndex(int i) {
            RemoteBlockPushResolver.logger.trace("{} shuffleId {} reduceId {} updated mapIndex {} current mapIndex {}", new Object[]{this.appShuffleId.appId, Integer.valueOf(this.appShuffleId.shuffleId), Integer.valueOf(this.reduceId), Integer.valueOf(this.currentMapIndex), Integer.valueOf(i)});
            this.currentMapIndex = i;
        }

        long getLastChunkOffset() {
            return this.lastChunkOffset;
        }

        void blockMerged(int i) {
            RemoteBlockPushResolver.logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", new Object[]{this.appShuffleId.appId, Integer.valueOf(this.appShuffleId.shuffleId), Integer.valueOf(this.reduceId), Integer.valueOf(i)});
            this.mapTracker.add(i);
            this.chunkTracker.add(i);
            this.lastMergedMapIndex = i;
        }

        void resetChunkTracker() {
            this.chunkTracker.clear();
        }

        void updateChunkInfo(long j, int i) throws IOException {
            try {
                RemoteBlockPushResolver.logger.trace("{} shuffleId {} reduceId {} index current {} updated {}", new Object[]{this.appShuffleId.appId, Integer.valueOf(this.appShuffleId.shuffleId), Integer.valueOf(this.reduceId), Long.valueOf(this.lastChunkOffset), Long.valueOf(j)});
                if (this.indexMetaUpdateFailed) {
                    this.indexFile.getChannel().position(this.indexFile.getPos());
                }
                this.indexFile.getDos().writeLong(j);
                writeChunkTracker(i);
                this.indexFile.updatePos(8L);
                this.lastChunkOffset = j;
                this.indexMetaUpdateFailed = false;
            } catch (IOException e) {
                RemoteBlockPushResolver.logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", new Object[]{this.appShuffleId.appId, Integer.valueOf(this.appShuffleId.shuffleId), Integer.valueOf(this.reduceId)});
                this.indexMetaUpdateFailed = true;
                throw e;
            }
        }

        private void writeChunkTracker(int i) throws IOException {
            if (i == -1) {
                return;
            }
            this.chunkTracker.add(i);
            RemoteBlockPushResolver.logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file", new Object[]{this.appShuffleId.appId, Integer.valueOf(this.appShuffleId.shuffleId), Integer.valueOf(this.reduceId), Integer.valueOf(i)});
            if (this.indexMetaUpdateFailed) {
                this.metaFile.getChannel().position(this.metaFile.getPos());
            }
            this.chunkTracker.serialize(this.metaFile.getDos());
            this.metaFile.updatePos(this.metaFile.getChannel().position() - this.metaFile.getPos());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void incrementIOExceptions() {
            this.numIOExceptions++;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean shouldAbort(int i) {
            return this.numIOExceptions > i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void finalizePartition() throws IOException {
            if (this.dataFilePos != this.lastChunkOffset) {
                try {
                    updateChunkInfo(this.dataFilePos, this.lastMergedMapIndex);
                } catch (IOException e) {
                }
            }
            this.dataChannel.truncate(this.lastChunkOffset);
            this.indexFile.getChannel().truncate(this.indexFile.getPos());
            this.metaFile.getChannel().truncate(this.metaFile.getPos());
        }

        void closeAllFiles() {
            if (this.dataChannel != null) {
                try {
                    this.dataChannel.close();
                } catch (IOException e) {
                    RemoteBlockPushResolver.logger.warn("Error closing data channel for {} shuffleId {} reduceId {}", new Object[]{this.appShuffleId.appId, Integer.valueOf(this.appShuffleId.shuffleId), Integer.valueOf(this.reduceId)});
                } finally {
                    this.dataChannel = null;
                }
            }
            try {
            } catch (IOException e2) {
                RemoteBlockPushResolver.logger.warn("Error closing meta file for {} shuffleId {} reduceId {}", new Object[]{this.appShuffleId.appId, Integer.valueOf(this.appShuffleId.shuffleId), Integer.valueOf(this.reduceId)});
            } finally {
                this.metaFile = null;
            }
            if (this.metaFile != null) {
                this.metaFile.close();
            }
            if (this.indexFile != null) {
                try {
                    try {
                        this.indexFile.close();
                        this.indexFile = null;
                    } catch (IOException e3) {
                        RemoteBlockPushResolver.logger.warn("Error closing index file for {} shuffleId {} reduceId {}", new Object[]{this.appShuffleId.appId, Integer.valueOf(this.appShuffleId.shuffleId), Integer.valueOf(this.reduceId)});
                        this.indexFile = null;
                    }
                } catch (Throwable th) {
                    this.indexFile = null;
                    throw th;
                }
            }
        }

        protected void finalize() throws Throwable {
            closeAllFiles();
        }

        @VisibleForTesting
        MergeShuffleFile getIndexFile() {
            return this.indexFile;
        }

        @VisibleForTesting
        MergeShuffleFile getMetaFile() {
            return this.metaFile;
        }

        @VisibleForTesting
        FileChannel getDataChannel() {
            return this.dataChannel;
        }

        @VisibleForTesting
        int getNumIOExceptions() {
            return this.numIOExceptions;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$MergeShuffleFile.class */
    public static class MergeShuffleFile {
        private FileChannel channel;
        private DataOutputStream dos;
        private long pos;

        @VisibleForTesting
        MergeShuffleFile(File file) throws IOException {
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            this.channel = fileOutputStream.getChannel();
            this.dos = new DataOutputStream(fileOutputStream);
        }

        @VisibleForTesting
        MergeShuffleFile(FileChannel fileChannel, DataOutputStream dataOutputStream) {
            this.channel = fileChannel;
            this.dos = dataOutputStream;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updatePos(long j) {
            this.pos += j;
        }

        void close() throws IOException {
            try {
                this.dos.close();
            } finally {
                this.dos = null;
                this.channel = null;
            }
        }

        @VisibleForTesting
        DataOutputStream getDos() {
            return this.dos;
        }

        @VisibleForTesting
        FileChannel getChannel() {
            return this.channel;
        }

        @VisibleForTesting
        long getPos() {
            return this.pos;
        }
    }

    /* loaded from: input_file:org/apache/spark/network/shuffle/RemoteBlockPushResolver$PushBlockStreamCallback.class */
    static class PushBlockStreamCallback implements StreamCallbackWithID {
        private final RemoteBlockPushResolver mergeManager;
        private final String streamId;
        private final int mapIndex;
        private final AppShufflePartitionInfo partitionInfo;
        private int length;
        private boolean isWriting;
        private List<ByteBuffer> deferredBufs;

        private PushBlockStreamCallback(RemoteBlockPushResolver remoteBlockPushResolver, String str, AppShufflePartitionInfo appShufflePartitionInfo, int i) {
            this.length = 0;
            this.isWriting = false;
            this.mergeManager = (RemoteBlockPushResolver) Preconditions.checkNotNull(remoteBlockPushResolver);
            this.streamId = str;
            this.partitionInfo = (AppShufflePartitionInfo) Preconditions.checkNotNull(appShufflePartitionInfo);
            this.mapIndex = i;
            abortIfNecessary();
        }

        @Override // org.apache.spark.network.client.StreamCallbackWithID
        public String getID() {
            return this.streamId;
        }

        private void writeBuf(ByteBuffer byteBuffer) throws IOException {
            while (byteBuffer.hasRemaining()) {
                long dataFilePos = this.partitionInfo.getDataFilePos() + this.length;
                RemoteBlockPushResolver.logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos {}", new Object[]{this.partitionInfo.appShuffleId.appId, Integer.valueOf(this.partitionInfo.appShuffleId.shuffleId), Integer.valueOf(this.partitionInfo.reduceId), Long.valueOf(this.partitionInfo.getDataFilePos()), Long.valueOf(dataFilePos)});
                this.length += this.partitionInfo.dataChannel.write(byteBuffer, dataFilePos);
            }
        }

        private boolean allowedToWrite() {
            return this.partitionInfo.getCurrentMapIndex() < 0 || this.partitionInfo.getCurrentMapIndex() == this.mapIndex;
        }

        private boolean isDuplicateBlock() {
            return (this.partitionInfo.getCurrentMapIndex() == this.mapIndex && this.length == 0) || this.partitionInfo.mapTracker.contains(this.mapIndex);
        }

        private void writeDeferredBufs() throws IOException {
            Iterator<ByteBuffer> it = this.deferredBufs.iterator();
            while (it.hasNext()) {
                writeBuf(it.next());
            }
            this.deferredBufs = null;
        }

        private void abortIfNecessary() {
            if (this.partitionInfo.shouldAbort(this.mergeManager.ioExceptionsThresholdDuringMerge)) {
                this.deferredBufs = null;
                throw new RuntimeException(String.format("%s when merging %s", ErrorHandler.BlockPushErrorHandler.IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX, this.streamId));
            }
        }

        private void incrementIOExceptionsAndAbortIfNecessary() {
            this.partitionInfo.incrementIOExceptions();
            abortIfNecessary();
        }

        @Override // org.apache.spark.network.client.StreamCallback
        public void onData(String str, ByteBuffer byteBuffer) throws IOException {
            synchronized (this.partitionInfo) {
                Map map = (Map) this.mergeManager.partitions.get(this.partitionInfo.appShuffleId);
                if (map == null || !map.containsKey(Integer.valueOf(this.partitionInfo.reduceId))) {
                    this.deferredBufs = null;
                    return;
                }
                if (!allowedToWrite()) {
                    RemoteBlockPushResolver.logger.trace("{} shuffleId {} reduceId {} onData deferred", new Object[]{this.partitionInfo.appShuffleId.appId, Integer.valueOf(this.partitionInfo.appShuffleId.shuffleId), Integer.valueOf(this.partitionInfo.reduceId)});
                    if (this.deferredBufs == null) {
                        this.deferredBufs = new ArrayList();
                    }
                    ByteBuffer allocate = ByteBuffer.allocate(byteBuffer.remaining());
                    allocate.put(byteBuffer);
                    allocate.flip();
                    this.deferredBufs.add(allocate);
                } else {
                    if (isDuplicateBlock()) {
                        this.deferredBufs = null;
                        return;
                    }
                    abortIfNecessary();
                    RemoteBlockPushResolver.logger.trace("{} shuffleId {} reduceId {} onData writable", new Object[]{this.partitionInfo.appShuffleId.appId, Integer.valueOf(this.partitionInfo.appShuffleId.shuffleId), Integer.valueOf(this.partitionInfo.reduceId)});
                    if (this.partitionInfo.getCurrentMapIndex() < 0) {
                        this.partitionInfo.setCurrentMapIndex(this.mapIndex);
                    }
                    this.isWriting = true;
                    try {
                        if (this.deferredBufs != null && !this.deferredBufs.isEmpty()) {
                            writeDeferredBufs();
                        }
                        writeBuf(byteBuffer);
                    } catch (IOException e) {
                        incrementIOExceptionsAndAbortIfNecessary();
                        throw e;
                    }
                }
            }
        }

        @Override // org.apache.spark.network.client.StreamCallback
        public void onComplete(String str) throws IOException {
            synchronized (this.partitionInfo) {
                RemoteBlockPushResolver.logger.trace("{} shuffleId {} reduceId {} onComplete invoked", new Object[]{this.partitionInfo.appShuffleId.appId, Integer.valueOf(this.partitionInfo.appShuffleId.shuffleId), Integer.valueOf(this.partitionInfo.reduceId)});
                Map map = (Map) this.mergeManager.partitions.get(this.partitionInfo.appShuffleId);
                if (map == null || !map.containsKey(Integer.valueOf(this.partitionInfo.reduceId))) {
                    this.deferredBufs = null;
                    throw new RuntimeException(String.format("Block %s %s", str, ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
                }
                if (!allowedToWrite()) {
                    this.deferredBufs = null;
                    throw new RuntimeException(String.format("%s %s to merged shuffle", ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX, str));
                }
                if (isDuplicateBlock()) {
                    this.deferredBufs = null;
                    return;
                }
                if (this.partitionInfo.getCurrentMapIndex() < 0) {
                    try {
                        if (this.deferredBufs != null && !this.deferredBufs.isEmpty()) {
                            abortIfNecessary();
                            this.isWriting = true;
                            writeDeferredBufs();
                        }
                    } catch (IOException e) {
                        incrementIOExceptionsAndAbortIfNecessary();
                        throw e;
                    }
                }
                long dataFilePos = this.partitionInfo.getDataFilePos() + this.length;
                boolean z = false;
                if (dataFilePos - this.partitionInfo.getLastChunkOffset() >= this.mergeManager.minChunkSize) {
                    try {
                        this.partitionInfo.updateChunkInfo(dataFilePos, this.mapIndex);
                        z = true;
                    } catch (IOException e2) {
                        incrementIOExceptionsAndAbortIfNecessary();
                    }
                }
                this.partitionInfo.setDataFilePos(dataFilePos);
                this.partitionInfo.setCurrentMapIndex(-1);
                this.partitionInfo.blockMerged(this.mapIndex);
                if (z) {
                    this.partitionInfo.resetChunkTracker();
                }
                this.isWriting = false;
            }
        }

        @Override // org.apache.spark.network.client.StreamCallback
        public void onFailure(String str, Throwable th) throws IOException {
            if (this.mergeManager.errorHandler.shouldLogError(th)) {
                RemoteBlockPushResolver.logger.error("Encountered issue when merging {}", str, th);
            } else {
                RemoteBlockPushResolver.logger.debug("Encountered issue when merging {}", str, th);
            }
            if (this.isWriting) {
                synchronized (this.partitionInfo) {
                    Map map = (Map) this.mergeManager.partitions.get(this.partitionInfo.appShuffleId);
                    if (map != null && map.containsKey(Integer.valueOf(this.partitionInfo.reduceId))) {
                        RemoteBlockPushResolver.logger.debug("{} shuffleId {} reduceId {} encountered failure", new Object[]{this.partitionInfo.appShuffleId.appId, Integer.valueOf(this.partitionInfo.appShuffleId.shuffleId), Integer.valueOf(this.partitionInfo.reduceId)});
                        this.partitionInfo.setCurrentMapIndex(-1);
                    }
                }
            }
            this.isWriting = false;
        }

        @VisibleForTesting
        AppShufflePartitionInfo getPartitionInfo() {
            return this.partitionInfo;
        }
    }

    public RemoteBlockPushResolver(TransportConf transportConf) {
        this.conf = transportConf;
        this.minChunkSize = transportConf.minChunkSizeInMergedShuffleFile();
        this.ioExceptionsThresholdDuringMerge = transportConf.ioExceptionsThresholdDuringMerge();
        this.indexCache = CacheBuilder.newBuilder().maximumWeight(transportConf.mergedIndexCacheSize()).weigher((file, shuffleIndexInformation) -> {
            return shuffleIndexInformation.getSize();
        }).build(new CacheLoader<File, ShuffleIndexInformation>() { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolver.1
            @Override // org.sparkproject.guava.cache.CacheLoader
            public ShuffleIndexInformation load(File file2) throws IOException {
                return new ShuffleIndexInformation(file2);
            }
        });
    }

    private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(AppShuffleId appShuffleId, int i) {
        File mergedShuffleDataFile = getMergedShuffleDataFile(appShuffleId, i);
        if (this.partitions.containsKey(appShuffleId) || !mergedShuffleDataFile.exists()) {
            return this.partitions.computeIfAbsent(appShuffleId, appShuffleId2 -> {
                return Maps.newConcurrentMap();
            }).computeIfAbsent(Integer.valueOf(i), num -> {
                File mergedShuffleIndexFile = getMergedShuffleIndexFile(appShuffleId, i);
                File mergedShuffleMetaFile = getMergedShuffleMetaFile(appShuffleId, i);
                try {
                    if (mergedShuffleDataFile.exists()) {
                        return null;
                    }
                    return newAppShufflePartitionInfo(appShuffleId, i, mergedShuffleDataFile, mergedShuffleIndexFile, mergedShuffleMetaFile);
                } catch (IOException e) {
                    logger.error("Cannot create merged shuffle partition with data file {}, index file {}, and meta file {}", new Object[]{mergedShuffleDataFile.getAbsolutePath(), mergedShuffleIndexFile.getAbsolutePath(), mergedShuffleMetaFile.getAbsolutePath()});
                    throw new RuntimeException(String.format("Cannot initialize merged shuffle partition for appId %s shuffleId %s reduceId %s", appShuffleId.appId, Integer.valueOf(appShuffleId.shuffleId), Integer.valueOf(i)), e);
                }
            });
        }
        return null;
    }

    @VisibleForTesting
    AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId appShuffleId, int i, File file, File file2, File file3) throws IOException {
        return new AppShufflePartitionInfo(appShuffleId, i, file, new MergeShuffleFile(file2), new MergeShuffleFile(file3));
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public MergedBlockMeta getMergedBlockMeta(String str, int i, int i2) {
        AppShuffleId appShuffleId = new AppShuffleId(str, i);
        File mergedShuffleIndexFile = getMergedShuffleIndexFile(appShuffleId, i2);
        if (!mergedShuffleIndexFile.exists()) {
            throw new RuntimeException(String.format("Merged shuffle index file %s not found", mergedShuffleIndexFile.getPath()));
        }
        int length = (((int) mergedShuffleIndexFile.length()) / 8) - 1;
        File mergedShuffleMetaFile = getMergedShuffleMetaFile(appShuffleId, i2);
        if (!mergedShuffleMetaFile.exists()) {
            throw new RuntimeException(String.format("Merged shuffle meta file %s not found", mergedShuffleMetaFile.getPath()));
        }
        FileSegmentManagedBuffer fileSegmentManagedBuffer = new FileSegmentManagedBuffer(this.conf, mergedShuffleMetaFile, 0L, mergedShuffleMetaFile.length());
        logger.trace("{} shuffleId {} reduceId {} num chunks {}", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(length)});
        return new MergedBlockMeta(length, fileSegmentManagedBuffer);
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public ManagedBuffer getMergedBlockData(String str, int i, int i2, int i3) {
        AppShuffleId appShuffleId = new AppShuffleId(str, i);
        File mergedShuffleDataFile = getMergedShuffleDataFile(appShuffleId, i2);
        if (!mergedShuffleDataFile.exists()) {
            throw new RuntimeException(String.format("Merged shuffle data file %s not found", mergedShuffleDataFile.getPath()));
        }
        File mergedShuffleIndexFile = getMergedShuffleIndexFile(appShuffleId, i2);
        try {
            ShuffleIndexRecord index = this.indexCache.get(mergedShuffleIndexFile).getIndex(i3);
            return new FileSegmentManagedBuffer(this.conf, mergedShuffleDataFile, index.getOffset(), index.getLength());
        } catch (ExecutionException e) {
            throw new RuntimeException(String.format("Failed to open merged shuffle index file %s", mergedShuffleIndexFile.getPath()), e);
        }
    }

    private File getFile(String str, String str2) {
        AppPathsInfo appPathsInfo = (AppPathsInfo) Preconditions.checkNotNull(this.appsPathInfo.get(str), "application " + str + " is not registered or NM was restarted.");
        File file = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs, appPathsInfo.subDirsPerLocalDir, str2);
        logger.debug("Get merged file {}", file.getAbsolutePath());
        return file;
    }

    private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int i) {
        return getFile(appShuffleId.appId, String.format("%s.data", generateFileName(appShuffleId, i)));
    }

    private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int i) {
        return getFile(appShuffleId.appId, String.format("%s.index", generateFileName(appShuffleId, i)));
    }

    private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int i) {
        return getFile(appShuffleId.appId, String.format("%s.meta", generateFileName(appShuffleId, i)));
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public String[] getMergedBlockDirs(String str) {
        return (String[]) Preconditions.checkNotNull(((AppPathsInfo) Preconditions.checkNotNull(this.appsPathInfo.get(str), "application " + str + " is not registered or NM was restarted.")).activeLocalDirs, "application " + str + " active local dirs list has not been updated by any executor registration");
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public void applicationRemoved(String str, boolean z) {
        logger.info("Application {} removed, cleanupLocalDirs = {}", str, Boolean.valueOf(z));
        AppPathsInfo appPathsInfo = (AppPathsInfo) Preconditions.checkNotNull(this.appsPathInfo.remove(str), "application " + str + " is not registered or NM was restarted.");
        Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> it = this.partitions.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> next = it.next();
            if (str.equals(next.getKey().appId)) {
                it.remove();
                Iterator<AppShufflePartitionInfo> it2 = next.getValue().values().iterator();
                while (it2.hasNext()) {
                    it2.next().closeAllFiles();
                }
            }
        }
        if (z) {
            Path[] pathArr = (Path[]) Arrays.stream(appPathsInfo.activeLocalDirs).map(str2 -> {
                return Paths.get(str2, new String[0]);
            }).toArray(i -> {
                return new Path[i];
            });
            this.directoryCleaner.execute(() -> {
                deleteExecutorDirs(pathArr);
            });
        }
    }

    @VisibleForTesting
    void deleteExecutorDirs(Path[] pathArr) {
        for (Path path : pathArr) {
            try {
                if (Files.exists(path, new LinkOption[0])) {
                    JavaUtils.deleteRecursively(path.toFile());
                    logger.debug("Successfully cleaned up directory: {}", path);
                }
            } catch (Exception e) {
                logger.error("Failed to delete directory: {}", path, e);
            }
        }
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream pushBlockStream) {
        AppShuffleId appShuffleId = new AppShuffleId(pushBlockStream.appId, pushBlockStream.shuffleId);
        AppShufflePartitionInfo orCreateAppShufflePartitionInfo = getOrCreateAppShufflePartitionInfo(appShuffleId, pushBlockStream.reduceId);
        final boolean z = orCreateAppShufflePartitionInfo == null;
        AppShufflePartitionInfo appShufflePartitionInfo = (orCreateAppShufflePartitionInfo == null || !orCreateAppShufflePartitionInfo.mapTracker.contains(pushBlockStream.mapIndex)) ? orCreateAppShufflePartitionInfo : null;
        final String format = String.format("%s_%d_%d_%d", OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, Integer.valueOf(appShuffleId.shuffleId), Integer.valueOf(pushBlockStream.mapIndex), Integer.valueOf(pushBlockStream.reduceId));
        return appShufflePartitionInfo != null ? new PushBlockStreamCallback(format, appShufflePartitionInfo, pushBlockStream.mapIndex) : new StreamCallbackWithID() { // from class: org.apache.spark.network.shuffle.RemoteBlockPushResolver.2
            @Override // org.apache.spark.network.client.StreamCallbackWithID
            public String getID() {
                return format;
            }

            @Override // org.apache.spark.network.client.StreamCallback
            public void onData(String str, ByteBuffer byteBuffer) {
            }

            @Override // org.apache.spark.network.client.StreamCallback
            public void onComplete(String str) {
                if (z) {
                    throw new RuntimeException(String.format("Block %s %s", str, ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
                }
            }

            @Override // org.apache.spark.network.client.StreamCallback
            public void onFailure(String str, Throwable th) {
            }
        };
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge finalizeShuffleMerge) throws IOException {
        MergeStatuses mergeStatuses;
        logger.info("Finalizing shuffle {} from Application {}.", Integer.valueOf(finalizeShuffleMerge.shuffleId), finalizeShuffleMerge.appId);
        AppShuffleId appShuffleId = new AppShuffleId(finalizeShuffleMerge.appId, finalizeShuffleMerge.shuffleId);
        Map<Integer, AppShufflePartitionInfo> map = this.partitions.get(appShuffleId);
        if (map == null || map.isEmpty()) {
            mergeStatuses = new MergeStatuses(finalizeShuffleMerge.shuffleId, new RoaringBitmap[0], new int[0], new long[0]);
        } else {
            Collection<AppShufflePartitionInfo> values = map.values();
            ArrayList arrayList = new ArrayList(values.size());
            ArrayList arrayList2 = new ArrayList(values.size());
            ArrayList arrayList3 = new ArrayList(values.size());
            Iterator<AppShufflePartitionInfo> it = values.iterator();
            while (it.hasNext()) {
                AppShufflePartitionInfo next = it.next();
                synchronized (next) {
                    try {
                        try {
                            next.finalizePartition();
                            arrayList.add(next.mapTracker);
                            arrayList2.add(Integer.valueOf(next.reduceId));
                            arrayList3.add(Long.valueOf(next.getLastChunkOffset()));
                            next.closeAllFiles();
                            it.remove();
                        } catch (IOException e) {
                            logger.warn("Exception while finalizing shuffle partition {} {} {}", new Object[]{finalizeShuffleMerge.appId, Integer.valueOf(finalizeShuffleMerge.shuffleId), Integer.valueOf(next.reduceId), e});
                            next.closeAllFiles();
                            it.remove();
                        }
                    } finally {
                    }
                }
            }
            mergeStatuses = new MergeStatuses(finalizeShuffleMerge.shuffleId, (RoaringBitmap[]) arrayList.toArray(new RoaringBitmap[arrayList.size()]), Ints.toArray(arrayList2), Longs.toArray(arrayList3));
        }
        this.partitions.remove(appShuffleId);
        logger.info("Finalized shuffle {} from Application {}.", Integer.valueOf(finalizeShuffleMerge.shuffleId), finalizeShuffleMerge.appId);
        return mergeStatuses;
    }

    @Override // org.apache.spark.network.shuffle.MergedShuffleFileManager
    public void registerExecutor(String str, ExecutorShuffleInfo executorShuffleInfo) {
        if (logger.isDebugEnabled()) {
            logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} num sub-dirs {}", new Object[]{str, Arrays.toString(executorShuffleInfo.localDirs), Integer.valueOf(executorShuffleInfo.subDirsPerLocalDir)});
        }
        this.appsPathInfo.computeIfAbsent(str, str2 -> {
            return new AppPathsInfo(str, executorShuffleInfo.localDirs, executorShuffleInfo.subDirsPerLocalDir);
        });
    }

    private static String generateFileName(AppShuffleId appShuffleId, int i) {
        return String.format("mergedShuffle_%s_%d_%d", appShuffleId.appId, Integer.valueOf(appShuffleId.shuffleId), Integer.valueOf(i));
    }
}
