package org.apache.spark.network.shuffle;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;
import org.apache.flink.api.common.operators.Keys;
import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LevelDBProvider;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.iq80.leveldb.DB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.annotations.VisibleForTesting;
import org.spark_project.guava.base.Objects;
import org.spark_project.guava.cache.CacheBuilder;
import org.spark_project.guava.cache.CacheLoader;
import org.spark_project.guava.cache.LoadingCache;
import org.spark_project.guava.cache.Weigher;
import org.spark_project.guava.collect.Maps;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.class */
public class ExternalShuffleBlockResolver {
    private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";

    @VisibleForTesting
    final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
    private final LoadingCache<File, ShuffleIndexInformation> shuffleIndexCache;
    private final Executor directoryCleaner;
    private final TransportConf conf;

    @VisibleForTesting
    final File registeredExecutorFile;

    @VisibleForTesting
    final DB db;
    private final List<String> knownManagers;
    private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final LevelDBProvider.StoreVersion CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
    private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}");

    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalShuffleBlockResolver$AppExecId.class */
    public static class AppExecId {
        public final String appId;
        public final String execId;

        @JsonCreator
        public AppExecId(@JsonProperty("appId") String str, @JsonProperty("execId") String str2) {
            this.appId = str;
            this.execId = str2;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AppExecId appExecId = (AppExecId) obj;
            return Objects.equal(this.appId, appExecId.appId) && Objects.equal(this.execId, appExecId.execId);
        }

        public int hashCode() {
            return Objects.hashCode(this.appId, this.execId);
        }

        public String toString() {
            return Objects.toStringHelper(this).add(ApplicationCLI.APP_ID, this.appId).add("execId", this.execId).toString();
        }
    }

    public ExternalShuffleBlockResolver(TransportConf transportConf, File file) throws IOException {
        this(transportConf, file, Executors.newSingleThreadExecutor(NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
    }

    @VisibleForTesting
    ExternalShuffleBlockResolver(TransportConf transportConf, File file, Executor executor) throws IOException {
        this.knownManagers = Arrays.asList("org.apache.spark.shuffle.sort.SortShuffleManager", "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
        this.conf = transportConf;
        this.registeredExecutorFile = file;
        this.shuffleIndexCache = CacheBuilder.newBuilder().maximumWeight(JavaUtils.byteStringAsBytes(transportConf.get("spark.shuffle.service.index.cache.size", "100m"))).weigher(new Weigher<File, ShuffleIndexInformation>() { // from class: org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.2
            @Override // org.spark_project.guava.cache.Weigher
            public int weigh(File file2, ShuffleIndexInformation shuffleIndexInformation) {
                return shuffleIndexInformation.getSize();
            }
        }).build(new CacheLoader<File, ShuffleIndexInformation>() { // from class: org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.1
            @Override // org.spark_project.guava.cache.CacheLoader
            public ShuffleIndexInformation load(File file2) throws IOException {
                return new ShuffleIndexInformation(file2);
            }
        });
        this.db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
        if (this.db != null) {
            this.executors = reloadRegisteredExecutors(this.db);
        } else {
            this.executors = Maps.newConcurrentMap();
        }
        this.directoryCleaner = executor;
    }

    public int getRegisteredExecutorsSize() {
        return this.executors.size();
    }

    public void registerExecutor(String str, String str2, ExecutorShuffleInfo executorShuffleInfo) {
        AppExecId appExecId = new AppExecId(str, str2);
        logger.info("Registered executor {} with {}", appExecId, executorShuffleInfo);
        if (!this.knownManagers.contains(executorShuffleInfo.shuffleManager)) {
            throw new UnsupportedOperationException("Unsupported shuffle manager of executor: " + executorShuffleInfo);
        }
        try {
            if (this.db != null) {
                this.db.put(dbAppExecKey(appExecId), mapper.writeValueAsString(executorShuffleInfo).getBytes(StandardCharsets.UTF_8));
            }
        } catch (Exception e) {
            logger.error("Error saving registered executors", e);
        }
        this.executors.put(appExecId, executorShuffleInfo);
    }

    public ManagedBuffer getBlockData(String str, String str2, int i, int i2, int i3) {
        ExecutorShuffleInfo executorShuffleInfo = this.executors.get(new AppExecId(str, str2));
        if (executorShuffleInfo == null) {
            throw new RuntimeException(String.format("Executor is not registered (appId=%s, execId=%s)", str, str2));
        }
        return getSortBasedShuffleBlockData(executorShuffleInfo, i, i2, i3);
    }

    public void applicationRemoved(String str, boolean z) {
        logger.info("Application {} removed, cleanupLocalDirs = {}", str, Boolean.valueOf(z));
        Iterator<Map.Entry<AppExecId, ExecutorShuffleInfo>> it = this.executors.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<AppExecId, ExecutorShuffleInfo> next = it.next();
            AppExecId key = next.getKey();
            ExecutorShuffleInfo value = next.getValue();
            if (str.equals(key.appId)) {
                it.remove();
                if (this.db != null) {
                    try {
                        this.db.delete(dbAppExecKey(key));
                    } catch (IOException e) {
                        logger.error("Error deleting {} from executor state db", str, e);
                    }
                }
                if (z) {
                    logger.info("Cleaning up executor {}'s {} local dirs", key, Integer.valueOf(value.localDirs.length));
                    this.directoryCleaner.execute(() -> {
                        deleteExecutorDirs(value.localDirs);
                    });
                }
            }
        }
    }

    public void executorRemoved(String str, String str2) {
        logger.info("Clean up non-shuffle files associated with the finished executor {}", str);
        AppExecId appExecId = new AppExecId(str2, str);
        ExecutorShuffleInfo executorShuffleInfo = this.executors.get(appExecId);
        if (executorShuffleInfo == null) {
            logger.info("Executor is not registered (appId={}, execId={})", str2, str);
        } else {
            logger.info("Cleaning up non-shuffle files in executor {}'s {} local dirs", appExecId, Integer.valueOf(executorShuffleInfo.localDirs.length));
            this.directoryCleaner.execute(() -> {
                deleteNonShuffleFiles(executorShuffleInfo.localDirs);
            });
        }
    }

    private void deleteExecutorDirs(String[] strArr) {
        for (String str : strArr) {
            try {
                JavaUtils.deleteRecursively(new File(str));
                logger.debug("Successfully cleaned up directory: {}", str);
            } catch (Exception e) {
                logger.error("Failed to delete directory: " + str, e);
            }
        }
    }

    private void deleteNonShuffleFiles(String[] strArr) {
        FilenameFilter filenameFilter = new FilenameFilter() { // from class: org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.3
            @Override // java.io.FilenameFilter
            public boolean accept(File file, String str) {
                return (str.endsWith(".index") || str.endsWith(".data")) ? false : true;
            }
        };
        for (String str : strArr) {
            try {
                JavaUtils.deleteRecursively(new File(str), filenameFilter);
                logger.debug("Successfully cleaned up non-shuffle files in directory: {}", str);
            } catch (Exception e) {
                logger.error("Failed to delete non-shuffle files in directory: " + str, e);
            }
        }
    }

    private ManagedBuffer getSortBasedShuffleBlockData(ExecutorShuffleInfo executorShuffleInfo, int i, int i2, int i3) {
        File file = getFile(executorShuffleInfo.localDirs, executorShuffleInfo.subDirsPerLocalDir, "shuffle_" + i + Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA + i2 + "_0.index");
        try {
            ShuffleIndexRecord index = this.shuffleIndexCache.get(file).getIndex(i3);
            return new FileSegmentManagedBuffer(this.conf, getFile(executorShuffleInfo.localDirs, executorShuffleInfo.subDirsPerLocalDir, "shuffle_" + i + Keys.ExpressionKeys.SELECT_ALL_CHAR_SCALA + i2 + "_0.data"), index.getOffset(), index.getLength());
        } catch (ExecutionException e) {
            throw new RuntimeException("Failed to open file: " + file, e);
        }
    }

    @VisibleForTesting
    static File getFile(String[] strArr, int i, String str) {
        int nonNegativeHash = JavaUtils.nonNegativeHash(str);
        return new File(createNormalizedInternedPathname(strArr[nonNegativeHash % strArr.length], String.format("%02x", Integer.valueOf((nonNegativeHash / strArr.length) % i)), str));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        if (this.db != null) {
            try {
                this.db.close();
            } catch (IOException e) {
                logger.error("Exception closing leveldb with registered executors", e);
            }
        }
    }

    @VisibleForTesting
    static String createNormalizedInternedPathname(String str, String str2, String str3) {
        String replaceAll = MULTIPLE_SEPARATORS.matcher(str + File.separator + str2 + File.separator + str3).replaceAll("/");
        if (replaceAll.length() > 1 && replaceAll.endsWith("/")) {
            replaceAll = replaceAll.substring(0, replaceAll.length() - 1);
        }
        return replaceAll.intern();
    }

    private static byte[] dbAppExecKey(AppExecId appExecId) throws IOException {
        return ("AppExecShuffleInfo;" + mapper.writeValueAsString(appExecId)).getBytes(StandardCharsets.UTF_8);
    }

    private static AppExecId parseDbAppExecKey(String str) throws IOException {
        if (!str.startsWith(APP_KEY_PREFIX)) {
            throw new IllegalArgumentException("expected a string starting with AppExecShuffleInfo");
        }
        return (AppExecId) mapper.readValue(str.substring(APP_KEY_PREFIX.length() + 1), AppExecId.class);
    }

    /* JADX WARN: Type inference failed for: r0v4, types: [org.iq80.leveldb.DBIterator] */
    @VisibleForTesting
    static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(DB db) throws IOException {
        ConcurrentMap<AppExecId, ExecutorShuffleInfo> newConcurrentMap = Maps.newConcurrentMap();
        if (db != null) {
            ?? iterator2 = db.iterator2();
            iterator2.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
            while (iterator2.hasNext()) {
                Map.Entry entry = (Map.Entry) iterator2.next();
                String str = new String((byte[]) entry.getKey(), StandardCharsets.UTF_8);
                if (!str.startsWith(APP_KEY_PREFIX)) {
                    break;
                }
                AppExecId parseDbAppExecKey = parseDbAppExecKey(str);
                logger.info("Reloading registered executors: " + parseDbAppExecKey.toString());
                newConcurrentMap.put(parseDbAppExecKey, (ExecutorShuffleInfo) mapper.readValue((byte[]) entry.getValue(), ExecutorShuffleInfo.class));
            }
        }
        return newConcurrentMap;
    }
}
