package org.apache.flink.runtime.rpc.akka;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaHandshakeException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcInvalidStateException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaRpcActor.class */
public class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends AbstractActor {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected final T rpcEndpoint;
    private final ClassLoader flinkClassLoader;
    private final MainThreadValidatorUtil mainThreadValidator;
    private final CompletableFuture<Boolean> terminationFuture;
    private final int version;
    private final long maximumFramesize;
    private final AtomicBoolean rpcEndpointStopped;
    private final boolean forceSerialization;
    private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult;

    @Nonnull
    private State state;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaRpcActor$RpcEndpointTerminationResult.class */
    public static final class RpcEndpointTerminationResult {
        private static final RpcEndpointTerminationResult SUCCESS = new RpcEndpointTerminationResult(null);

        @Nullable
        private final Throwable failureCause;

        private RpcEndpointTerminationResult(@Nullable Throwable th) {
            this.failureCause = th;
        }

        public boolean isSuccess() {
            return this.failureCause == null;
        }

        public Throwable getFailureCause() {
            Preconditions.checkState(this.failureCause != null);
            return this.failureCause;
        }

        private static RpcEndpointTerminationResult success() {
            return SUCCESS;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static RpcEndpointTerminationResult failure(Throwable th) {
            return new RpcEndpointTerminationResult(th);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static RpcEndpointTerminationResult of(@Nullable Throwable th) {
            return th == null ? success() : failure(th);
        }

        static /* synthetic */ RpcEndpointTerminationResult access$400() {
            return success();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaRpcActor$StartedState.class */
    public enum StartedState implements State {
        STARTED;

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActor.State
        public State start(AkkaRpcActor<?> akkaRpcActor, ClassLoader classLoader) {
            return STARTED;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActor.State
        public State stop() {
            return StoppedState.STOPPED;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActor.State
        public State terminate(AkkaRpcActor<?> akkaRpcActor, ClassLoader classLoader) {
            CompletableFuture completedExceptionally;
            ((AkkaRpcActor) akkaRpcActor).mainThreadValidator.enterMainThread();
            try {
                try {
                    completedExceptionally = (CompletableFuture) ClassLoadingUtils.runWithContextClassLoader(() -> {
                        return akkaRpcActor.rpcEndpoint.internalCallOnStop();
                    }, classLoader);
                    ((AkkaRpcActor) akkaRpcActor).mainThreadValidator.exitMainThread();
                } catch (Throwable th) {
                    completedExceptionally = FutureUtils.completedExceptionally(new AkkaRpcException(String.format("Failure while stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()), th));
                    ((AkkaRpcActor) akkaRpcActor).mainThreadValidator.exitMainThread();
                }
                completedExceptionally.whenComplete((r4, th2) -> {
                    akkaRpcActor.stop(RpcEndpointTerminationResult.of(th2));
                });
                return TerminatingState.TERMINATING;
            } catch (Throwable th3) {
                ((AkkaRpcActor) akkaRpcActor).mainThreadValidator.exitMainThread();
                throw th3;
            }
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActor.State
        public boolean isRunning() {
            return true;
        }
    }

    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaRpcActor$State.class */
    interface State {
        default State start(AkkaRpcActor<?> akkaRpcActor, ClassLoader classLoader) {
            throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.STARTED));
        }

        default State stop() {
            throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.STOPPED));
        }

        default State terminate(AkkaRpcActor<?> akkaRpcActor, ClassLoader classLoader) {
            throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.TERMINATING));
        }

        default State finishTermination() {
            return TerminatedState.TERMINATED;
        }

        default boolean isRunning() {
            return false;
        }

        default String invalidStateTransitionMessage(State state) {
            return String.format("AkkaRpcActor is currently in state %s and cannot go into state %s.", this, state);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaRpcActor$StoppedState.class */
    public enum StoppedState implements State {
        STOPPED;

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActor.State
        public State start(AkkaRpcActor<?> akkaRpcActor, ClassLoader classLoader) {
            ((AkkaRpcActor) akkaRpcActor).mainThreadValidator.enterMainThread();
            try {
                try {
                    ClassLoadingUtils.runWithContextClassLoader(() -> {
                        akkaRpcActor.rpcEndpoint.internalCallOnStart();
                    }, classLoader);
                    ((AkkaRpcActor) akkaRpcActor).mainThreadValidator.exitMainThread();
                } catch (Throwable th) {
                    akkaRpcActor.stop(RpcEndpointTerminationResult.failure(new AkkaRpcException(String.format("Could not start RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()), th)));
                    ((AkkaRpcActor) akkaRpcActor).mainThreadValidator.exitMainThread();
                }
                return StartedState.STARTED;
            } catch (Throwable th2) {
                ((AkkaRpcActor) akkaRpcActor).mainThreadValidator.exitMainThread();
                throw th2;
            }
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActor.State
        public State stop() {
            return STOPPED;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActor.State
        public State terminate(AkkaRpcActor<?> akkaRpcActor, ClassLoader classLoader) {
            akkaRpcActor.stop(RpcEndpointTerminationResult.access$400());
            return TerminatingState.TERMINATING;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaRpcActor$TerminatedState.class */
    public enum TerminatedState implements State {
        TERMINATED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaRpcActor$TerminatingState.class */
    public enum TerminatingState implements State {
        TERMINATING;

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActor.State
        public State terminate(AkkaRpcActor<?> akkaRpcActor, ClassLoader classLoader) {
            return TERMINATING;
        }

        @Override // org.apache.flink.runtime.rpc.akka.AkkaRpcActor.State
        public boolean isRunning() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AkkaRpcActor(T t, CompletableFuture<Boolean> completableFuture, int i, long j, boolean z, ClassLoader classLoader) {
        Preconditions.checkArgument(j > 0, "Maximum framesize must be positive.");
        this.rpcEndpoint = (T) ((RpcEndpoint) Preconditions.checkNotNull(t, "rpc endpoint"));
        this.flinkClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.forceSerialization = z;
        this.mainThreadValidator = new MainThreadValidatorUtil(t);
        this.terminationFuture = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        this.version = i;
        this.maximumFramesize = j;
        this.rpcEndpointStopped = new AtomicBoolean(false);
        this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(new AkkaRpcException(String.format("RpcEndpoint %s has not been properly stopped.", t.getEndpointId())));
        this.state = StoppedState.STOPPED;
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public void postStop() throws Exception {
        super.postStop();
        if (this.rpcEndpointTerminationResult.isSuccess()) {
            this.log.debug("The RpcEndpoint {} terminated successfully.", this.rpcEndpoint.getEndpointId());
            this.terminationFuture.complete(null);
        } else {
            this.log.info("The RpcEndpoint {} failed.", this.rpcEndpoint.getEndpointId(), this.rpcEndpointTerminationResult.getFailureCause());
            this.terminationFuture.completeExceptionally(this.rpcEndpointTerminationResult.getFailureCause());
        }
        this.state = this.state.finishTermination();
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RemoteHandshakeMessage.class, this::handleHandshakeMessage).match(ControlMessages.class, this::handleControlMessage).matchAny(this::handleMessage).build();
    }

    private void handleMessage(Object obj) {
        if (!this.state.isRunning()) {
            this.log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", this.rpcEndpoint.getClass().getName(), obj);
            sendErrorIfSender(new EndpointNotStartedException(String.format("Discard message %s, because the rpc endpoint %s has not been started yet.", obj, this.rpcEndpoint.getAddress())));
        } else {
            this.mainThreadValidator.enterMainThread();
            try {
                handleRpcMessage(obj);
            } finally {
                this.mainThreadValidator.exitMainThread();
            }
        }
    }

    private void handleControlMessage(ControlMessages controlMessages) {
        try {
            switch (controlMessages) {
                case START:
                    this.state = this.state.start(this, this.flinkClassLoader);
                    break;
                case STOP:
                    this.state = this.state.stop();
                    break;
                case TERMINATE:
                    this.state = this.state.terminate(this, this.flinkClassLoader);
                    break;
                default:
                    handleUnknownControlMessage(controlMessages);
                    break;
            }
        } catch (Exception e) {
            this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);
            throw e;
        }
    }

    private void handleUnknownControlMessage(ControlMessages controlMessages) {
        String format = String.format("Received unknown control message %s. Dropping this message!", controlMessages);
        this.log.warn(format);
        sendErrorIfSender(new AkkaUnknownMessageException(format));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleRpcMessage(Object obj) {
        if (obj instanceof RunAsync) {
            handleRunAsync((RunAsync) obj);
            return;
        }
        if (obj instanceof CallAsync) {
            handleCallAsync((CallAsync) obj);
        } else if (obj instanceof RpcInvocation) {
            handleRpcInvocation((RpcInvocation) obj);
        } else {
            this.log.warn("Received message of unknown type {} with value {}. Dropping this message!", obj.getClass().getName(), obj);
            sendErrorIfSender(new AkkaUnknownMessageException("Received unknown message " + obj + " of type " + obj.getClass().getSimpleName() + '.'));
        }
    }

    private void handleHandshakeMessage(RemoteHandshakeMessage remoteHandshakeMessage) {
        if (!isCompatibleVersion(remoteHandshakeMessage.getVersion())) {
            sendErrorIfSender(new AkkaHandshakeException(String.format("Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.", Integer.valueOf(remoteHandshakeMessage.getVersion()), Integer.valueOf(getVersion()))));
        } else if (isGatewaySupported(remoteHandshakeMessage.getRpcGateway())) {
            getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), getSelf());
        } else {
            sendErrorIfSender(new AkkaHandshakeException(String.format("The rpc endpoint does not support the gateway %s.", remoteHandshakeMessage.getRpcGateway().getSimpleName())));
        }
    }

    private boolean isGatewaySupported(Class<?> cls) {
        return cls.isAssignableFrom(this.rpcEndpoint.getClass());
    }

    private boolean isCompatibleVersion(int i) {
        return i == getVersion();
    }

    private int getVersion() {
        return this.version;
    }

    private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        Method method = null;
        try {
            method = lookupRpcMethod(rpcInvocation.getMethodName(), rpcInvocation.getParameterTypes());
        } catch (NoSuchMethodException e) {
            this.log.error("Could not find rpc method for rpc invocation.", e);
            getSender().tell(new Status.Failure(new RpcConnectionException("Could not find rpc method for rpc invocation.", e)), getSelf());
        }
        if (method != null) {
            try {
                method.setAccessible(true);
                Method method2 = method;
                if (method.getReturnType().equals(Void.TYPE)) {
                    ClassLoadingUtils.runWithContextClassLoader(() -> {
                        return method2.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                    }, this.flinkClassLoader);
                } else {
                    try {
                        Object runWithContextClassLoader = ClassLoadingUtils.runWithContextClassLoader((SupplierWithException<Object, E>) () -> {
                            return method2.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                        }, this.flinkClassLoader);
                        String name = method.getName();
                        boolean z = method.getAnnotation(Local.class) != null;
                        if (runWithContextClassLoader instanceof CompletableFuture) {
                            sendAsyncResponse((CompletableFuture) runWithContextClassLoader, name, z);
                        } else {
                            sendSyncResponse(runWithContextClassLoader, name, z);
                        }
                    } catch (InvocationTargetException e2) {
                        this.log.debug("Reporting back error thrown in remote procedure {}", method, e2);
                        getSender().tell(new Status.Failure(e2.getTargetException()), getSelf());
                    }
                }
            } catch (Throwable th) {
                this.log.error("Error while executing remote procedure call {}.", method, th);
                getSender().tell(new Status.Failure(th), getSelf());
            }
        }
    }

    private void sendSyncResponse(Object obj, String str, boolean z) {
        if (!isRemoteSender(getSender()) && (!this.forceSerialization || z)) {
            getSender().tell(new Status.Success(obj), getSelf());
            return;
        }
        Either<AkkaRpcSerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize = serializeRemoteResultAndVerifySize(obj, str);
        if (serializeRemoteResultAndVerifySize.isLeft()) {
            getSender().tell(new Status.Success(serializeRemoteResultAndVerifySize.left()), getSelf());
        } else {
            getSender().tell(new Status.Failure(serializeRemoteResultAndVerifySize.right()), getSelf());
        }
    }

    private void sendAsyncResponse(CompletableFuture<?> completableFuture, String str, boolean z) {
        ActorRef sender = getSender();
        Promise.DefaultPromise defaultPromise = new Promise.DefaultPromise();
        FutureUtils.assertNoException(completableFuture.handle((obj, th) -> {
            if (th != null) {
                defaultPromise.failure(th);
                return null;
            }
            if (!isRemoteSender(sender) && (!this.forceSerialization || z)) {
                defaultPromise.success(new Status.Success(obj));
                return null;
            }
            Either<AkkaRpcSerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize = serializeRemoteResultAndVerifySize(obj, str);
            if (serializeRemoteResultAndVerifySize.isLeft()) {
                defaultPromise.success(serializeRemoteResultAndVerifySize.left());
                return null;
            }
            defaultPromise.failure(serializeRemoteResultAndVerifySize.right());
            return null;
        }));
        Patterns.pipe(defaultPromise.future(), getContext().dispatcher()).to(sender);
    }

    private boolean isRemoteSender(ActorRef actorRef) {
        return !actorRef.path().address().hasLocalScope();
    }

    private Either<AkkaRpcSerializedValue, AkkaRpcException> serializeRemoteResultAndVerifySize(Object obj, String str) {
        try {
            AkkaRpcSerializedValue valueOf = AkkaRpcSerializedValue.valueOf(obj);
            long serializedDataLength = valueOf.getSerializedDataLength();
            return serializedDataLength > this.maximumFramesize ? Either.Right(new AkkaRpcException("The method " + str + "'s result size " + serializedDataLength + " exceeds the maximum size " + this.maximumFramesize + " .")) : Either.Left(valueOf);
        } catch (IOException e) {
            return Either.Right(new AkkaRpcException("Failed to serialize the result for RPC call : " + str + '.', e));
        }
    }

    private void handleCallAsync(CallAsync callAsync) {
        try {
            getSender().tell(new Status.Success(ClassLoadingUtils.runWithContextClassLoader(() -> {
                return callAsync.getCallable().call();
            }, this.flinkClassLoader)), getSelf());
        } catch (Throwable th) {
            getSender().tell(new Status.Failure(th), getSelf());
        }
    }

    private void handleRunAsync(RunAsync runAsync) {
        long timeNanos = runAsync.getTimeNanos();
        if (timeNanos != 0) {
            long nanoTime = timeNanos - System.nanoTime();
            if (nanoTime > 0) {
                getContext().system().scheduler().scheduleOnce(new FiniteDuration(nanoTime, TimeUnit.NANOSECONDS), getSelf(), envelopeSelfMessage(new RunAsync(runAsync.getRunnable(), timeNanos)), getContext().dispatcher(), ActorRef.noSender());
                return;
            }
        }
        try {
            ClassLoadingUtils.runWithContextClassLoader(() -> {
                runAsync.getRunnable().run();
            }, this.flinkClassLoader);
        } catch (Throwable th) {
            this.log.error("Caught exception while executing runnable in main thread.", th);
            ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
        }
    }

    private Method lookupRpcMethod(String str, Class<?>[] clsArr) throws NoSuchMethodException {
        return this.rpcEndpoint.getClass().getMethod(str, clsArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void sendErrorIfSender(Throwable th) {
        if (getSender().equals(ActorRef.noSender())) {
            return;
        }
        getSender().tell(new Status.Failure(th), getSelf());
    }

    protected Object envelopeSelfMessage(Object obj) {
        return obj;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stop(RpcEndpointTerminationResult rpcEndpointTerminationResult) {
        if (this.rpcEndpointStopped.compareAndSet(false, true)) {
            this.rpcEndpointTerminationResult = rpcEndpointTerminationResult;
            getContext().stop(getSelf());
        }
    }
}
