package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorIdentity;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.Cancellable;
import akka.actor.Identify;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.dispatch.Futures;
import akka.dispatch.Mapper;
import akka.pattern.Patterns;
import java.lang.reflect.Proxy;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.SelfGateway;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcService.class */
public class AkkaRpcService implements RpcService {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcService.class);
    static final String MAXIMUM_FRAME_SIZE_PATH = "akka.remote.netty.tcp.maximum-frame-size";
    private final ActorSystem actorSystem;
    private final Time timeout;
    private final long maximumFramesize;
    private final String address;
    private final ScheduledExecutor internalScheduledExecutor;
    private volatile boolean stopped;
    private final Object lock = new Object();
    private final Set<ActorRef> actors = new HashSet(4);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcService$InternalScheduledExecutorImpl.class */
    public static final class InternalScheduledExecutorImpl implements ScheduledExecutor {
        private final ActorSystem actorSystem;

        /* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaRpcService$InternalScheduledExecutorImpl$ScheduledFutureTask.class */
        private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
            private long time;
            private final long period;
            private volatile Cancellable cancellable;

            ScheduledFutureTask(Callable<V> callable, long j, long j2) {
                super(callable);
                this.time = j;
                this.period = j2;
            }

            ScheduledFutureTask(Runnable runnable, long j, long j2) {
                super(runnable, null);
                this.time = j;
                this.period = j2;
            }

            public void setCancellable(Cancellable cancellable) {
                this.cancellable = cancellable;
            }

            @Override // java.util.concurrent.FutureTask, java.util.concurrent.RunnableFuture, java.lang.Runnable
            public void run() {
                if (!isPeriodic()) {
                    super.run();
                    return;
                }
                if (runAndReset()) {
                    if (this.period > 0) {
                        this.time += this.period;
                        return;
                    }
                    this.cancellable = InternalScheduledExecutorImpl.this.internalSchedule(this, -this.period, TimeUnit.NANOSECONDS);
                    if (isCancelled()) {
                        this.cancellable.cancel();
                    } else {
                        this.time = InternalScheduledExecutorImpl.this.triggerTime(-this.period);
                    }
                }
            }

            @Override // java.util.concurrent.FutureTask, java.util.concurrent.Future
            public boolean cancel(boolean z) {
                return super.cancel(z) && this.cancellable.cancel();
            }

            @Override // java.util.concurrent.Delayed
            public long getDelay(@Nonnull TimeUnit timeUnit) {
                return timeUnit.convert(this.time - InternalScheduledExecutorImpl.this.now(), TimeUnit.NANOSECONDS);
            }

            @Override // java.lang.Comparable
            public int compareTo(@Nonnull Delayed delayed) {
                if (delayed == this) {
                    return 0;
                }
                long delay = getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS);
                if (delay < 0) {
                    return -1;
                }
                return delay > 0 ? 1 : 0;
            }

            @Override // java.util.concurrent.RunnableScheduledFuture
            public boolean isPeriodic() {
                return this.period != 0;
            }
        }

        private InternalScheduledExecutorImpl(ActorSystem actorSystem) {
            this.actorSystem = (ActorSystem) Preconditions.checkNotNull(actorSystem, "rpcService");
        }

        @Override // org.apache.flink.runtime.concurrent.ScheduledExecutor
        @Nonnull
        public ScheduledFuture<?> schedule(@Nonnull Runnable runnable, long j, @Nonnull TimeUnit timeUnit) {
            ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(runnable, timeUnit.toNanos(j), 0L);
            scheduledFutureTask.setCancellable(internalSchedule(scheduledFutureTask, j, timeUnit));
            return scheduledFutureTask;
        }

        @Override // org.apache.flink.runtime.concurrent.ScheduledExecutor
        @Nonnull
        public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long j, @Nonnull TimeUnit timeUnit) {
            ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(callable, timeUnit.toNanos(j), 0L);
            scheduledFutureTask.setCancellable(internalSchedule(scheduledFutureTask, j, timeUnit));
            return scheduledFutureTask;
        }

        @Override // org.apache.flink.runtime.concurrent.ScheduledExecutor
        @Nonnull
        public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable runnable, long j, long j2, @Nonnull TimeUnit timeUnit) {
            ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(runnable, triggerTime(timeUnit.toNanos(j)), timeUnit.toNanos(j2));
            scheduledFutureTask.setCancellable(this.actorSystem.scheduler().schedule(new FiniteDuration(j, timeUnit), new FiniteDuration(j2, timeUnit), scheduledFutureTask, this.actorSystem.dispatcher()));
            return scheduledFutureTask;
        }

        @Override // org.apache.flink.runtime.concurrent.ScheduledExecutor
        @Nonnull
        public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable runnable, long j, long j2, @Nonnull TimeUnit timeUnit) {
            ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(runnable, triggerTime(timeUnit.toNanos(j)), timeUnit.toNanos(-j2));
            scheduledFutureTask.setCancellable(internalSchedule(scheduledFutureTask, j, timeUnit));
            return scheduledFutureTask;
        }

        @Override // java.util.concurrent.Executor
        public void execute(@Nonnull Runnable runnable) {
            this.actorSystem.dispatcher().execute(runnable);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Cancellable internalSchedule(Runnable runnable, long j, TimeUnit timeUnit) {
            return this.actorSystem.scheduler().scheduleOnce(new FiniteDuration(j, timeUnit), runnable, this.actorSystem.dispatcher());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long now() {
            return System.nanoTime();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long triggerTime(long j) {
            return now() + j;
        }
    }

    public AkkaRpcService(ActorSystem actorSystem, Time time) {
        this.actorSystem = (ActorSystem) Preconditions.checkNotNull(actorSystem, "actor system");
        this.timeout = (Time) Preconditions.checkNotNull(time, "timeout");
        if (actorSystem.settings().config().hasPath(MAXIMUM_FRAME_SIZE_PATH)) {
            this.maximumFramesize = actorSystem.settings().config().getBytes(MAXIMUM_FRAME_SIZE_PATH).longValue();
        } else {
            this.maximumFramesize = Long.MAX_VALUE;
        }
        Address address = AkkaUtils.getAddress(actorSystem);
        if (address.host().isDefined()) {
            this.address = address.host().get();
        } else {
            this.address = "";
        }
        this.internalScheduledExecutor = new InternalScheduledExecutorImpl(actorSystem);
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public String getAddress() {
        return this.address;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <C extends RpcGateway> Future<C> connect(final String str, final Class<C> cls) {
        Preconditions.checkState(!this.stopped, "RpcService is stopped");
        LOG.debug("Try to connect to remote RPC endpoint with address {}. Returning a {} gateway.", str, cls.getName());
        return new FlinkFuture(Patterns.ask(this.actorSystem.actorSelection(str), new Identify(42), this.timeout.toMilliseconds()).map(new Mapper<Object, C>() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcService.1
            /* JADX WARN: Incorrect return type in method signature: (Ljava/lang/Object;)TC; */
            @Override // akka.dispatch.Mapper
            public RpcGateway checkedApply(Object obj) throws Exception {
                ActorIdentity actorIdentity = (ActorIdentity) obj;
                if (actorIdentity.getRef() == null) {
                    throw new RpcConnectionException("Could not connect to rpc endpoint under address " + str + '.');
                }
                ActorRef ref = actorIdentity.getRef();
                String akkaURL = AkkaUtils.getAkkaURL(AkkaRpcService.this.actorSystem, ref);
                Option<String> host = ref.path().address().host();
                return (RpcGateway) Proxy.newProxyInstance(AkkaRpcService.this.getClass().getClassLoader(), new Class[]{cls}, new AkkaInvocationHandler(akkaURL, host.isEmpty() ? "localhost" : host.get(), ref, AkkaRpcService.this.timeout, AkkaRpcService.this.maximumFramesize, null));
            }
        }, this.actorSystem.dispatcher()));
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S s) {
        ActorRef actorOf;
        Preconditions.checkNotNull(s, "rpc endpoint");
        FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Props create = Props.create((Class<?>) AkkaRpcActor.class, s, flinkCompletableFuture);
        synchronized (this.lock) {
            Preconditions.checkState(!this.stopped, "RpcService is stopped");
            actorOf = this.actorSystem.actorOf(create, s.getEndpointId());
            this.actors.add(actorOf);
        }
        LOG.info("Starting RPC endpoint for {} at {} .", s.getClass().getName(), actorOf.path());
        String akkaURL = AkkaUtils.getAkkaURL(this.actorSystem, actorOf);
        Option<String> host = actorOf.path().address().host();
        return (C) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{s.getSelfGatewayType(), SelfGateway.class, MainThreadExecutable.class, StartStoppable.class, AkkaGateway.class}, new AkkaInvocationHandler(akkaURL, host.isEmpty() ? "localhost" : host.get(), actorOf, this.timeout, this.maximumFramesize, flinkCompletableFuture));
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public void stopServer(RpcGateway rpcGateway) {
        if (rpcGateway instanceof AkkaGateway) {
            AkkaGateway akkaGateway = (AkkaGateway) rpcGateway;
            synchronized (this.lock) {
                if (this.stopped) {
                    return;
                }
                boolean remove = this.actors.remove(akkaGateway.getRpcEndpoint());
                if (!remove) {
                    LOG.debug("RPC endpoint {} already stopped or from different RPC service");
                    return;
                }
                ActorRef rpcEndpoint = akkaGateway.getRpcEndpoint();
                LOG.info("Stopping RPC endpoint {}.", rpcEndpoint.path());
                rpcEndpoint.tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
        }
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public void stopService() {
        LOG.info("Stopping Akka RPC service.");
        synchronized (this.lock) {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
            this.actorSystem.shutdown();
            this.actors.clear();
            this.actorSystem.awaitTermination();
        }
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public Future<Void> getTerminationFuture() {
        return FlinkFuture.supplyAsync(new Callable<Void>() { // from class: org.apache.flink.runtime.rpc.akka.AkkaRpcService.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                AkkaRpcService.this.actorSystem.awaitTermination();
                return null;
            }
        }, getExecutor());
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public Executor getExecutor() {
        return this.actorSystem.dispatcher();
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public ScheduledExecutor getScheduledExecutor() {
        return this.internalScheduledExecutor;
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public ScheduledFuture<?> scheduleRunnable(Runnable runnable, long j, TimeUnit timeUnit) {
        Preconditions.checkNotNull(runnable, "runnable");
        Preconditions.checkNotNull(timeUnit, "unit");
        Preconditions.checkArgument(j >= 0, "delay must be zero or larger");
        return this.internalScheduledExecutor.schedule(runnable, j, timeUnit);
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public void execute(Runnable runnable) {
        this.actorSystem.dispatcher().execute(runnable);
    }

    @Override // org.apache.flink.runtime.rpc.RpcService
    public <T> Future<T> execute(Callable<T> callable) {
        return new FlinkFuture(Futures.future(callable, this.actorSystem.dispatcher()));
    }
}
