package org.apache.spark.network.shuffle;

import com.codahale.metrics.MetricSet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.crypto.AuthClientBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.shuffle.ErrorHandler;
import org.apache.spark.network.shuffle.RetryingBlockFetcher;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.BlocksRemoved;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.RemoveBlocks;
import org.apache.spark.network.util.TransportConf;
import org.sparkproject.guava.collect.Lists;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockStoreClient.class */
public class ExternalBlockStoreClient extends BlockStoreClient {
    private static final ErrorHandler PUSH_ERROR_HANDLER;
    private final TransportConf conf;
    private final boolean authEnabled;
    private final SecretKeyHolder secretKeyHolder;
    private final long registrationTimeoutMs;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ExternalBlockStoreClient(TransportConf transportConf, SecretKeyHolder secretKeyHolder, boolean z, long j) {
        this.conf = transportConf;
        this.secretKeyHolder = secretKeyHolder;
        this.authEnabled = z;
        this.registrationTimeoutMs = j;
    }

    public void init(String str) {
        this.appId = str;
        TransportContext transportContext = new TransportContext(this.conf, new NoOpRpcHandler(), true, true);
        ArrayList newArrayList = Lists.newArrayList();
        if (this.authEnabled) {
            newArrayList.add(new AuthClientBootstrap(this.conf, str, this.secretKeyHolder));
        }
        this.clientFactory = transportContext.createClientFactory(newArrayList);
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public void fetchBlocks(String str, int i, String str2, String[] strArr, BlockFetchingListener blockFetchingListener, DownloadFileManager downloadFileManager) {
        checkInit();
        this.logger.debug("External shuffle fetch from {}:{} (executor id {})", new Object[]{str, Integer.valueOf(i), str2});
        try {
            int maxIORetries = this.conf.maxIORetries();
            RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = (strArr2, blockFetchingListener2) -> {
                if (this.clientFactory != null) {
                    new OneForOneBlockFetcher(this.clientFactory.createClient(str, i, maxIORetries > 0), this.appId, str2, strArr2, blockFetchingListener2, this.conf, downloadFileManager).start();
                } else {
                    this.logger.info("This clientFactory was closed. Skipping further block fetch retries.");
                }
            };
            if (maxIORetries > 0) {
                new RetryingBlockFetcher(this.conf, blockFetchStarter, strArr, blockFetchingListener).start();
            } else {
                blockFetchStarter.createAndStart(strArr, blockFetchingListener);
            }
        } catch (Exception e) {
            this.logger.error("Exception while beginning fetchBlocks", e);
            for (String str3 : strArr) {
                blockFetchingListener.onBlockFetchFailure(str3, e);
            }
        }
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public void pushBlocks(String str, int i, String[] strArr, ManagedBuffer[] managedBufferArr, BlockFetchingListener blockFetchingListener) {
        checkInit();
        if (!$assertionsDisabled && strArr.length != managedBufferArr.length) {
            throw new AssertionError("Number of block ids and buffers do not match.");
        }
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < strArr.length; i2++) {
            hashMap.put(strArr[i2], managedBufferArr[i2]);
        }
        this.logger.debug("Push {} shuffle blocks to {}:{}", new Object[]{Integer.valueOf(strArr.length), str, Integer.valueOf(i)});
        try {
            RetryingBlockFetcher.BlockFetchStarter blockFetchStarter = (strArr2, blockFetchingListener2) -> {
                new OneForOneBlockPusher(this.clientFactory.createClient(str, i), this.appId, strArr2, blockFetchingListener2, hashMap).start();
            };
            if (this.conf.maxIORetries() > 0) {
                new RetryingBlockFetcher(this.conf, blockFetchStarter, strArr, blockFetchingListener, PUSH_ERROR_HANDLER).start();
            } else {
                blockFetchStarter.createAndStart(strArr, blockFetchingListener);
            }
        } catch (Exception e) {
            this.logger.error("Exception while beginning pushBlocks", e);
            for (String str2 : strArr) {
                blockFetchingListener.onBlockFetchFailure(str2, e);
            }
        }
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public void finalizeShuffleMerge(String str, int i, int i2, final MergeFinalizerListener mergeFinalizerListener) {
        checkInit();
        try {
            this.clientFactory.createClient(str, i).sendRpc(new FinalizeShuffleMerge(this.appId, i2).toByteBuffer(), new RpcResponseCallback() { // from class: org.apache.spark.network.shuffle.ExternalBlockStoreClient.1
                @Override // org.apache.spark.network.client.RpcResponseCallback
                public void onSuccess(ByteBuffer byteBuffer) {
                    mergeFinalizerListener.onShuffleMergeSuccess((MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer));
                }

                @Override // org.apache.spark.network.client.RpcResponseCallback
                public void onFailure(Throwable th) {
                    mergeFinalizerListener.onShuffleMergeFailure(th);
                }
            });
        } catch (Exception e) {
            this.logger.error("Exception while sending finalizeShuffleMerge request to {}:{}", new Object[]{str, Integer.valueOf(i), e});
            mergeFinalizerListener.onShuffleMergeFailure(e);
        }
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public MetricSet shuffleMetrics() {
        checkInit();
        return this.clientFactory.getAllMetrics();
    }

    public void registerWithShuffleServer(String str, int i, String str2, ExecutorShuffleInfo executorShuffleInfo) throws IOException, InterruptedException {
        checkInit();
        TransportClient createClient = this.clientFactory.createClient(str, i);
        Throwable th = null;
        try {
            createClient.sendRpcSync(new RegisterExecutor(this.appId, str2, executorShuffleInfo).toByteBuffer(), this.registrationTimeoutMs);
            if (createClient != null) {
                if (0 == 0) {
                    createClient.close();
                    return;
                }
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createClient != null) {
                if (0 != 0) {
                    try {
                        createClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createClient.close();
                }
            }
            throw th3;
        }
    }

    public Future<Integer> removeBlocks(String str, int i, final String str2, final String[] strArr) throws IOException, InterruptedException {
        checkInit();
        final CompletableFuture completableFuture = new CompletableFuture();
        this.clientFactory.createClient(str, i).sendRpc(new RemoveBlocks(this.appId, str2, strArr).toByteBuffer(), new RpcResponseCallback() { // from class: org.apache.spark.network.shuffle.ExternalBlockStoreClient.2
            @Override // org.apache.spark.network.client.RpcResponseCallback
            public void onSuccess(ByteBuffer byteBuffer) {
                try {
                    completableFuture.complete(Integer.valueOf(((BlocksRemoved) BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer)).numRemovedBlocks));
                } catch (Throwable th) {
                    ExternalBlockStoreClient.this.logger.warn("Error trying to remove RDD blocks " + Arrays.toString(strArr) + " via external shuffle service from executor: " + str2, th);
                    completableFuture.complete(0);
                }
            }

            @Override // org.apache.spark.network.client.RpcResponseCallback
            public void onFailure(Throwable th) {
                ExternalBlockStoreClient.this.logger.warn("Error trying to remove RDD blocks " + Arrays.toString(strArr) + " via external shuffle service from executor: " + str2, th);
                completableFuture.complete(0);
            }
        });
        return completableFuture;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        checkInit();
        if (this.clientFactory != null) {
            this.clientFactory.close();
            this.clientFactory = null;
        }
    }

    static {
        $assertionsDisabled = !ExternalBlockStoreClient.class.desiredAssertionStatus();
        PUSH_ERROR_HANDLER = new ErrorHandler.BlockPushErrorHandler();
    }
}
