package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.AddressFromURIString;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.math3.geometry.VectorFormat;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaUtils.class */
public class AkkaUtils {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaUtils.class);
    private static final String FLINK_ACTOR_SYSTEM_NAME = "flink";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:flink-rpc-akka.jar:org/apache/flink/runtime/rpc/akka/AkkaUtils$AkkaConfigBuilder.class */
    public static class AkkaConfigBuilder {
        private final StringWriter stringWriter;
        private final PrintWriter printWriter;

        private AkkaConfigBuilder() {
            this.stringWriter = new StringWriter();
            this.printWriter = new PrintWriter(this.stringWriter);
        }

        public AkkaConfigBuilder add(String str) {
            this.printWriter.println(str);
            return this;
        }

        public Config build() {
            return ConfigFactory.parseString(this.stringWriter.toString()).resolve();
        }
    }

    AkkaUtils() {
    }

    public static String getFlinkActorSystemName() {
        return FLINK_ACTOR_SYSTEM_NAME;
    }

    private static Config getBasicAkkaConfig(Configuration configuration) {
        int integer = configuration.getInteger(AkkaOptions.DISPATCHER_THROUGHPUT);
        String booleanToOnOrOff = booleanToOnOrOff(configuration.getBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR));
        String booleanToOnOrOff2 = booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS));
        return new AkkaConfigBuilder().add("akka {").add("  daemonic = off").add("  loggers = [\"akka.event.slf4j.Slf4jLogger\"]").add("  logging-filter = \"akka.event.slf4j.Slf4jLoggingFilter\"").add("  log-config-on-start = off").add("  logger-startup-timeout = 50s").add("  loglevel = " + getLogLevel()).add("  stdout-loglevel = OFF").add("  log-dead-letters = " + booleanToOnOrOff2).add("  log-dead-letters-during-shutdown = " + booleanToOnOrOff2).add("  jvm-exit-on-fatal-error = " + booleanToOnOrOff).add("  serialize-messages = off").add("  actor {").add("    guardian-supervisor-strategy = " + EscalatingSupervisorStrategy.class.getCanonicalName()).add("    warn-about-java-serializer-usage = off").add("    allow-java-serialization = on").add("    default-dispatcher {").add("      throughput = " + integer).add("    }").add("    supervisor-dispatcher {").add("      type = Dispatcher").add("      executor = \"thread-pool-executor\"").add("      thread-pool-executor {").add("        core-pool-size-min = 1").add("        core-pool-size-max = 1").add("      }").add("    }").add("  }").add(VectorFormat.DEFAULT_SUFFIX).build();
    }

    private static String getLogLevel() {
        return (LOG.isTraceEnabled() || LOG.isDebugEnabled()) ? "DEBUG" : LOG.isInfoEnabled() ? "INFO" : LOG.isWarnEnabled() ? "WARNING" : LOG.isErrorEnabled() ? "ERROR" : "OFF";
    }

    public static Config getThreadPoolExecutorConfig(RpcSystem.FixedThreadPoolExecutorConfiguration fixedThreadPoolExecutorConfiguration) {
        return new AkkaConfigBuilder().add("akka {").add("  actor {").add("    default-dispatcher {").add("      type = org.apache.flink.runtime.rpc.akka.PriorityThreadsDispatcher").add("      executor = thread-pool-executor").add("      thread-priority = " + fixedThreadPoolExecutorConfiguration.getThreadPriority()).add("      thread-pool-executor {").add("          core-pool-size-min = " + fixedThreadPoolExecutorConfiguration.getMinNumThreads()).add("          core-pool-size-max = " + fixedThreadPoolExecutorConfiguration.getMaxNumThreads()).add("      }").add("    }").add("  }").add(VectorFormat.DEFAULT_SUFFIX).build();
    }

    public static Config getForkJoinExecutorConfig(RpcSystem.ForkJoinExecutorConfiguration forkJoinExecutorConfiguration) {
        return new AkkaConfigBuilder().add("akka {").add("  actor {").add("    default-dispatcher {").add("      executor = fork-join-executor").add("      fork-join-executor {").add("          parallelism-factor = " + forkJoinExecutorConfiguration.getParallelismFactor()).add("          parallelism-min = " + forkJoinExecutorConfiguration.getMinParallelism()).add("          parallelism-max = " + forkJoinExecutorConfiguration.getMaxParallelism()).add("      }").add("    }").add("  }").add(VectorFormat.DEFAULT_SUFFIX).build();
    }

    private static Config getRemoteAkkaConfig(Configuration configuration, String str, int i, String str2, int i2) {
        AkkaConfigBuilder akkaConfigBuilder = new AkkaConfigBuilder();
        addBaseRemoteAkkaConfig(akkaConfigBuilder, configuration, i, i2);
        addHostnameRemoteAkkaConfig(akkaConfigBuilder, str, str2);
        addSslRemoteAkkaConfig(akkaConfigBuilder, configuration);
        return akkaConfigBuilder.build();
    }

    private static void addBaseRemoteAkkaConfig(AkkaConfigBuilder akkaConfigBuilder, Configuration configuration, int i, int i2) {
        akkaConfigBuilder.add("akka {").add("  actor {").add("    provider = \"akka.remote.RemoteActorRefProvider\"").add("  }").add("  remote.artery.enabled = false").add("  remote.startup-timeout = " + TimeUtils.getStringInMillis(TimeUtils.parseDuration(configuration.getString(AkkaOptions.STARTUP_TIMEOUT, TimeUtils.getStringInMillis(((Duration) configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)).multipliedBy(10L)))))).add("  remote.warn-about-direct-use = off").add("  remote.use-unsafe-remote-features-outside-cluster = on").add("  remote.classic {").add("    # disable the transport failure detector by setting very high values").add("    transport-failure-detector{").add("      acceptable-heartbeat-pause = 6000 s").add("      heartbeat-interval = 1000 s").add("      threshold = 300").add("    }").add("    enabled-transports = [\"akka.remote.classic.netty.tcp\"]").add("    netty {").add("      tcp {").add("        transport-class = \"akka.remote.transport.netty.NettyTransport\"").add("        port = " + i2).add("        bind-port = " + i).add("        connection-timeout = " + TimeUtils.getStringInMillis(TimeUtils.parseDuration(configuration.getString(AkkaOptions.TCP_TIMEOUT)))).add("        maximum-frame-size = " + configuration.getString(AkkaOptions.FRAMESIZE)).add("        tcp-nodelay = on").add("        client-socket-worker-pool {").add("          pool-size-min = " + ((Integer) configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN)).intValue()).add("          pool-size-max = " + ((Integer) configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX)).intValue()).add("          pool-size-factor = " + ((Double) configuration.get(AkkaOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR)).doubleValue()).add("        }").add("        server-socket-worker-pool {").add("          pool-size-min = " + ((Integer) configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN)).intValue()).add("          pool-size-max = " + ((Integer) configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX)).intValue()).add("          pool-size-factor = " + ((Double) configuration.get(AkkaOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR)).doubleValue()).add("        }").add("      }").add("    }").add("    log-remote-lifecycle-events = " + booleanToOnOrOff(configuration.getBoolean(AkkaOptions.LOG_LIFECYCLE_EVENTS))).add("    retry-gate-closed-for = " + configuration.getLong(AkkaOptions.RETRY_GATE_CLOSED_FOR) + " ms").add("  }").add(VectorFormat.DEFAULT_SUFFIX);
    }

    private static void addHostnameRemoteAkkaConfig(AkkaConfigBuilder akkaConfigBuilder, String str, String str2) {
        String unresolvedHostToNormalizedString = NetUtils.unresolvedHostToNormalizedString(str2);
        akkaConfigBuilder.add("akka {").add("  remote.classic {").add("    netty {").add("      tcp {").add("        hostname = \"" + ((unresolvedHostToNormalizedString == null || unresolvedHostToNormalizedString.isEmpty()) ? "" : unresolvedHostToNormalizedString) + "\"").add("        bind-hostname = \"" + str + "\"").add("      }").add("    }").add("  }").add(VectorFormat.DEFAULT_SUFFIX);
    }

    private static void addSslRemoteAkkaConfig(AkkaConfigBuilder akkaConfigBuilder, Configuration configuration) {
        String booleanToOnOrOff = booleanToOnOrOff(configuration.getBoolean(AkkaOptions.SSL_ENABLED) && SecurityOptions.isInternalSSLEnabled(configuration));
        String string = configuration.getString(SecurityOptions.SSL_INTERNAL_KEYSTORE, configuration.getString(SecurityOptions.SSL_KEYSTORE));
        String string2 = configuration.getString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, configuration.getString(SecurityOptions.SSL_KEYSTORE_PASSWORD));
        String string3 = configuration.getString(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, configuration.getString(SecurityOptions.SSL_KEY_PASSWORD));
        String string4 = configuration.getString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, configuration.getString(SecurityOptions.SSL_TRUSTSTORE));
        String string5 = configuration.getString(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, configuration.getString(SecurityOptions.SSL_TRUSTSTORE_PASSWORD));
        String string6 = configuration.getString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT);
        akkaConfigBuilder.add("akka {").add("  remote.classic {").add("    enabled-transports = [\"akka.remote.classic.netty.ssl\"]").add("    netty {").add("      ssl = ${akka.remote.classic.netty.tcp}").add("      ssl {").add("        enable-ssl = " + booleanToOnOrOff).add("        ssl-engine-provider = " + CustomSSLEngineProvider.class.getCanonicalName()).add("        security {").add("          key-store = \"" + string + "\"").add("          key-store-password = \"" + string2 + "\"").add("          key-password = \"" + string3 + "\"").add("          trust-store = \"" + string4 + "\"").add("          trust-store-password = \"" + string5 + "\"").add("          protocol = " + configuration.getString(SecurityOptions.SSL_PROTOCOL) + "").add("          enabled-algorithms = " + ((String) Arrays.stream(configuration.getString(SecurityOptions.SSL_ALGORITHMS).split(",")).collect(Collectors.joining(",", "[", "]"))) + "").add("          random-number-generator = \"\"").add("          require-mutual-authentication = on").add("          cert-fingerprints = " + (string6 != null ? (String) Arrays.stream(string6.split(",")).collect(Collectors.joining("\",\"", "[\"", "\"]")) : "[]") + "").add("        }").add("      }").add("    }").add("  }").add(VectorFormat.DEFAULT_SUFFIX);
    }

    public static ActorSystem createLocalActorSystem(Configuration configuration) {
        return createActorSystem(getAkkaConfig(configuration, null));
    }

    private static ActorSystem createActorSystem(Config config) {
        return createActorSystem(getFlinkActorSystemName(), config);
    }

    public static ActorSystem createActorSystem(String str, Config config) {
        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
        return RobustActorSystem.create(str, config);
    }

    @VisibleForTesting
    public static ActorSystem createDefaultActorSystem() {
        return createActorSystem(getDefaultAkkaConfig());
    }

    private static Config getDefaultAkkaConfig() {
        return getAkkaConfig(new Configuration(), new HostAndPort("", 0));
    }

    public static Config getAkkaConfig(Configuration configuration, @Nullable HostAndPort hostAndPort) {
        return getAkkaConfig(configuration, hostAndPort, null, getForkJoinExecutorConfig(AkkaBootstrapTools.getForkJoinExecutorConfiguration(configuration)));
    }

    public static Config getAkkaConfig(Configuration configuration, @Nullable HostAndPort hostAndPort, @Nullable HostAndPort hostAndPort2, Config config) {
        Config withFallback = getBasicAkkaConfig(configuration).withFallback((ConfigMergeable) config);
        return hostAndPort != null ? hostAndPort2 != null ? getRemoteAkkaConfig(configuration, hostAndPort2.getHost(), hostAndPort2.getPort(), hostAndPort.getHost(), hostAndPort.getPort()).withFallback((ConfigMergeable) withFallback) : getRemoteAkkaConfig(configuration, NetUtils.getWildcardIPAddress(), hostAndPort.getPort(), hostAndPort.getHost(), hostAndPort.getPort()).withFallback((ConfigMergeable) withFallback) : withFallback;
    }

    public static Address getAddress(ActorSystem actorSystem) {
        return new RemoteAddressExtension().apply(actorSystem).getAddress();
    }

    public static String getAkkaURL(ActorSystem actorSystem, ActorRef actorRef) {
        return actorRef.path().toStringWithAddress(getAddress(actorSystem));
    }

    public static Address getAddressFromAkkaURL(String str) throws MalformedURLException {
        return AddressFromURIString.apply(str);
    }

    public static InetSocketAddress getInetSocketAddressFromAkkaURL(String str) throws Exception {
        try {
            Address addressFromAkkaURL = getAddressFromAkkaURL(str);
            if (addressFromAkkaURL.host().isDefined() && addressFromAkkaURL.port().isDefined()) {
                return new InetSocketAddress(addressFromAkkaURL.host().get(), ((Integer) addressFromAkkaURL.port().get()).intValue());
            }
            throw new MalformedURLException();
        } catch (MalformedURLException e) {
            throw new Exception("Could not retrieve InetSocketAddress from Akka URL " + str);
        }
    }

    public static CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem) {
        return AkkaFutureUtils.toJava(actorSystem.terminate()).thenAccept(FunctionUtils.ignoreFn());
    }

    private static String booleanToOnOrOff(boolean z) {
        return z ? BooleanUtils.ON : "off";
    }
}
