package org.apache.flink.runtime.rpc.pekko;

import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import one.profiler.Events;
import org.apache.commons.text.lookup.StringLookupFactory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.ClassLoadingUtils;
import org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.exceptions.RpcRuntimeException;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.runtime.rpc.pekko.SupervisorActor;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Address;
import org.apache.pekko.actor.DeadLetter;
import org.apache.pekko.actor.Props;
import org.apache.pekko.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.reflect.ClassTag$;

@ThreadSafe
/* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoRpcService.class */
public class PekkoRpcService implements RpcService {
    private static final Logger LOG = LoggerFactory.getLogger(PekkoRpcService.class);
    static final int VERSION = 2;
    private final Object lock;
    private final ActorSystem actorSystem;
    private final PekkoRpcServiceConfiguration configuration;
    private final ClassLoader flinkClassLoader;

    @GuardedBy(Events.LOCK)
    private final Map<ActorRef, RpcEndpoint> actors;
    private final String address;
    private final int port;
    private final boolean captureAskCallstacks;
    private final ScheduledExecutor internalScheduledExecutor;
    private final CompletableFuture<Void> terminationFuture;
    private final Supervisor supervisor;
    private volatile boolean stopped;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/pekko/PekkoRpcService$Supervisor.class */
    public static final class Supervisor implements AutoCloseableAsync {
        private final ActorRef actor;
        private final ExecutorService terminationFutureExecutor;

        private Supervisor(ActorRef actorRef, ExecutorService executorService) {
            this.actor = actorRef;
            this.terminationFutureExecutor = executorService;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static Supervisor create(ActorRef actorRef, ExecutorService executorService) {
            return new Supervisor(actorRef, executorService);
        }

        public ActorRef getActor() {
            return this.actor;
        }

        @Override // org.apache.flink.util.AutoCloseableAsync
        public CompletableFuture<Void> closeAsync() {
            return ExecutorUtils.nonBlockingShutdown(30L, TimeUnit.SECONDS, this.terminationFutureExecutor);
        }
    }

    @VisibleForTesting
    public PekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration pekkoRpcServiceConfiguration) {
        this(actorSystem, pekkoRpcServiceConfiguration, PekkoRpcService.class.getClassLoader());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration pekkoRpcServiceConfiguration, ClassLoader classLoader) {
        this.lock = new Object();
        this.actors = CollectionUtil.newHashMapWithExpectedSize(4);
        this.actorSystem = (ActorSystem) Preconditions.checkNotNull(actorSystem, "actor system");
        this.configuration = (PekkoRpcServiceConfiguration) Preconditions.checkNotNull(pekkoRpcServiceConfiguration, "pekko rpc service configuration");
        this.flinkClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader, "flinkClassLoader");
        Address address = PekkoUtils.getAddress(actorSystem);
        if (address.host().isDefined()) {
            this.address = address.host().get();
        } else {
            this.address = "";
        }
        if (address.port().isDefined()) {
            this.port = ((Integer) address.port().get()).intValue();
        } else {
            this.port = -1;
        }
        this.captureAskCallstacks = pekkoRpcServiceConfiguration.captureAskCallStack();
        this.internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem, classLoader);
        this.terminationFuture = new CompletableFuture<>();
        this.stopped = false;
        this.supervisor = startSupervisorActor();
        startDeadLettersActor();
    }

    private void startDeadLettersActor() {
        this.actorSystem.eventStream().subscribe(this.actorSystem.actorOf(DeadLettersActor.getProps(), "deadLettersActor"), DeadLetter.class);
    }

    private Supervisor startSupervisorActor() {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("RpcService-Supervisor-Termination-Future-Executor"));
        return Supervisor.create(SupervisorActor.startSupervisorActor(this.actorSystem, ClassLoadingUtils.withContextClassLoader(newSingleThreadExecutor, this.flinkClassLoader)), newSingleThreadExecutor);
    }

    public ActorSystem getActorSystem() {
        return this.actorSystem;
    }

    protected int getVersion() {
        return 2;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public String getAddress() {
        return this.address;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public int getPort() {
        return this.port;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <C extends RpcGateway> C getSelfGateway(Class<C> cls, RpcServer rpcServer) {
        if (cls.isInstance(rpcServer)) {
            return rpcServer;
        }
        throw new ClassCastException("RpcEndpoint does not implement the RpcGateway interface of type " + cls + '.');
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <C extends RpcGateway> CompletableFuture<C> connect(String str, Class<C> cls) {
        return connectInternal(str, cls, actorRef -> {
            Tuple2<String, String> extractAddressHostname = extractAddressHostname(actorRef);
            return new PekkoInvocationHandler(extractAddressHostname.f0, extractAddressHostname.f1, actorRef, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), this.configuration.isForceRpcInvocationSerialization(), null, this.captureAskCallstacks, this.flinkClassLoader);
        });
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(String str, F f, Class<C> cls) {
        return connectInternal(str, cls, actorRef -> {
            Tuple2<String, String> extractAddressHostname = extractAddressHostname(actorRef);
            return new FencedPekkoInvocationHandler(extractAddressHostname.f0, extractAddressHostname.f1, actorRef, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), this.configuration.isForceRpcInvocationSerialization(), null, () -> {
                return f;
            }, this.captureAskCallstacks, this.flinkClassLoader);
        });
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C c) {
        PekkoInvocationHandler pekkoInvocationHandler;
        Preconditions.checkNotNull(c, "rpc endpoint");
        SupervisorActor.ActorRegistration registerRpcActor = registerRpcActor(c);
        ActorRef actorRef = registerRpcActor.getActorRef();
        CompletableFuture<Void> terminationFuture = registerRpcActor.getTerminationFuture();
        LOG.info("Starting RPC endpoint for {} at {} .", c.getClass().getName(), actorRef.path());
        String rpcURL = PekkoUtils.getRpcURL(this.actorSystem, actorRef);
        Option<String> host = actorRef.path().address().host();
        String str = host.isEmpty() ? StringLookupFactory.KEY_LOCALHOST : host.get();
        HashSet hashSet = new HashSet(RpcUtils.extractImplementedRpcGateways(c.getClass()));
        hashSet.add(RpcServer.class);
        hashSet.add(PekkoBasedEndpoint.class);
        if (c instanceof FencedRpcEndpoint) {
            Duration timeout = this.configuration.getTimeout();
            long maximumFramesize = this.configuration.getMaximumFramesize();
            boolean isForceRpcInvocationSerialization = this.configuration.isForceRpcInvocationSerialization();
            FencedRpcEndpoint fencedRpcEndpoint = (FencedRpcEndpoint) c;
            fencedRpcEndpoint.getClass();
            pekkoInvocationHandler = new FencedPekkoInvocationHandler(rpcURL, str, actorRef, timeout, maximumFramesize, isForceRpcInvocationSerialization, terminationFuture, fencedRpcEndpoint::getFencingToken, this.captureAskCallstacks, this.flinkClassLoader);
        } else {
            pekkoInvocationHandler = new PekkoInvocationHandler(rpcURL, str, actorRef, this.configuration.getTimeout(), this.configuration.getMaximumFramesize(), this.configuration.isForceRpcInvocationSerialization(), terminationFuture, this.captureAskCallstacks, this.flinkClassLoader);
        }
        return (RpcServer) Proxy.newProxyInstance(getClass().getClassLoader(), (Class[]) hashSet.toArray(new Class[hashSet.size()]), pekkoInvocationHandler);
    }

    private <C extends RpcEndpoint & RpcGateway> SupervisorActor.ActorRegistration registerRpcActor(C c) {
        SupervisorActor.ActorRegistration orElseThrow;
        Class cls = c instanceof FencedRpcEndpoint ? FencedPekkoRpcActor.class : PekkoRpcActor.class;
        synchronized (this.lock) {
            Preconditions.checkState(!this.stopped, "RpcService is stopped");
            Class cls2 = cls;
            orElseThrow = SupervisorActor.startRpcActor(this.supervisor.getActor(), completableFuture -> {
                return Props.create((Class<?>) cls2, c, completableFuture, Integer.valueOf(getVersion()), Long.valueOf(this.configuration.getMaximumFramesize()), Boolean.valueOf(this.configuration.isForceRpcInvocationSerialization()), this.flinkClassLoader);
            }, c.getEndpointId()).orElseThrow(th -> {
                return new RpcRuntimeException(String.format("Could not create the %s for %s.", PekkoRpcActor.class.getSimpleName(), c.getEndpointId()), th);
            });
            this.actors.put(orElseThrow.getActorRef(), c);
        }
        return orElseThrow;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public void stopServer(RpcServer rpcServer) {
        if (rpcServer instanceof PekkoBasedEndpoint) {
            PekkoBasedEndpoint pekkoBasedEndpoint = (PekkoBasedEndpoint) rpcServer;
            synchronized (this.lock) {
                if (this.stopped) {
                    return;
                }
                RpcEndpoint remove = this.actors.remove(pekkoBasedEndpoint.getActorRef());
                if (remove != null) {
                    terminateRpcActor(pekkoBasedEndpoint.getActorRef(), remove);
                } else {
                    LOG.debug("RPC endpoint {} already stopped or from different RPC service", rpcServer.getAddress());
                }
            }
        }
    }

    @Override // org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        synchronized (this.lock) {
            if (this.stopped) {
                return this.terminationFuture;
            }
            LOG.info("Stopping Pekko RPC service.");
            this.stopped = true;
            CompletableFuture<Void> terminateRpcActors = terminateRpcActors();
            Supervisor supervisor = this.supervisor;
            supervisor.getClass();
            FutureUtils.composeAfterwards(FutureUtils.composeAfterwards(terminateRpcActors, supervisor::closeAsync), () -> {
                return ScalaFutureUtils.toJava(this.actorSystem.terminate());
            }).whenComplete((r5, th) -> {
                ClassLoadingUtils.runWithContextClassLoader(() -> {
                    FutureUtils.doForward(r5, th, this.terminationFuture);
                }, this.flinkClassLoader);
                LOG.info("Stopped Pekko RPC service.");
            });
            return this.terminationFuture;
        }
    }

    @Nonnull
    @GuardedBy(Events.LOCK)
    private CompletableFuture<Void> terminateRpcActors() {
        ArrayList arrayList = new ArrayList(this.actors.size());
        for (Map.Entry<ActorRef, RpcEndpoint> entry : this.actors.entrySet()) {
            arrayList.add(terminateRpcActor(entry.getKey(), entry.getValue()));
        }
        this.actors.clear();
        return FutureUtils.waitForAll(arrayList);
    }

    private CompletableFuture<Void> terminateRpcActor(ActorRef actorRef, RpcEndpoint rpcEndpoint) {
        actorRef.tell(ControlMessages.TERMINATE, ActorRef.noSender());
        return rpcEndpoint.getTerminationFuture();
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public ScheduledExecutor getScheduledExecutor() {
        return this.internalScheduledExecutor;
    }

    private Tuple2<String, String> extractAddressHostname(ActorRef actorRef) {
        String rpcURL = PekkoUtils.getRpcURL(this.actorSystem, actorRef);
        Option<String> host = actorRef.path().address().host();
        return Tuple2.of(rpcURL, host.isEmpty() ? StringLookupFactory.KEY_LOCALHOST : host.get());
    }

    private <C extends RpcGateway> CompletableFuture<C> connectInternal(String str, Class<C> cls, Function<ActorRef, InvocationHandler> function) {
        Preconditions.checkState(!this.stopped, "RpcService is stopped");
        LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", str, cls.getName());
        CompletableFuture<ActorRef> resolveActorAddress = resolveActorAddress(str);
        return ClassLoadingUtils.guardCompletionWithContextClassLoader(resolveActorAddress.thenCombineAsync(resolveActorAddress.thenCompose(actorRef -> {
            return ScalaFutureUtils.toJava(Patterns.ask(actorRef, new RemoteHandshakeMessage(cls, getVersion()), this.configuration.getTimeout().toMillis()).mapTo(ClassTag$.MODULE$.apply(HandshakeSuccessMessage.class)));
        }), (actorRef2, handshakeSuccessMessage) -> {
            return (RpcGateway) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{cls}, (InvocationHandler) function.apply(actorRef2));
        }, (Executor) this.actorSystem.dispatcher()), this.flinkClassLoader);
    }

    private CompletableFuture<ActorRef> resolveActorAddress(String str) {
        return this.actorSystem.actorSelection(str).resolveOne(this.configuration.getTimeout()).toCompletableFuture().exceptionally(th -> {
            throw new CompletionException(new RpcConnectionException(String.format("Could not connect to rpc endpoint under address %s.", str), th));
        });
    }
}
