package org.apache.flink.runtime.entrypoint;

import akka.actor.ActorSystem;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/entrypoint/ClusterEntrypoint.class */
public abstract class ClusterEntrypoint implements FatalErrorHandler {
    protected static final Logger LOG;
    protected static final int SUCCESS_RETURN_CODE = 0;
    protected static final int STARTUP_FAILURE_RETURN_CODE = 1;
    protected static final int RUNTIME_FAILURE_RETURN_CODE = 2;
    private final Configuration configuration;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private MetricRegistryImpl metricRegistry = null;

    @GuardedBy("lock")
    private HighAvailabilityServices haServices = null;

    @GuardedBy("lock")
    private BlobServer blobServer = null;

    @GuardedBy("lock")
    private HeartbeatServices heartbeatServices = null;

    @GuardedBy("lock")
    private RpcService commonRpcService = null;
    private final CompletableFuture<Boolean> terminationFuture = new CompletableFuture<>();

    /* JADX INFO: Access modifiers changed from: protected */
    public ClusterEntrypoint(Configuration configuration) {
        this.configuration = (Configuration) Preconditions.checkNotNull(configuration);
    }

    public CompletableFuture<Boolean> getTerminationFuture() {
        return this.terminationFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startCluster() {
        LOG.info("Starting {}.", getClass().getSimpleName());
        try {
            configureFileSystems(this.configuration);
            installSecurityContext(this.configuration).runSecured(new Callable<Void>() { // from class: org.apache.flink.runtime.entrypoint.ClusterEntrypoint.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    ClusterEntrypoint.this.runCluster(ClusterEntrypoint.this.configuration);
                    return null;
                }
            });
        } catch (Throwable th) {
            LOG.error("Cluster initialization failed.", th);
            try {
                shutDown(false);
            } catch (Throwable th2) {
                LOG.error("Could not properly shut down cluster entrypoint.", th2);
            }
            System.exit(1);
        }
    }

    protected void configureFileSystems(Configuration configuration) throws Exception {
        LOG.info("Install default filesystem.");
        try {
            FileSystem.initialize(configuration);
        } catch (IOException e) {
            throw new IOException("Error while setting the default filesystem scheme from configuration.", e);
        }
    }

    protected SecurityContext installSecurityContext(Configuration configuration) throws Exception {
        LOG.info("Install security context.");
        SecurityUtils.install(new SecurityConfiguration(configuration));
        return SecurityUtils.getInstalledContext();
    }

    protected void runCluster(Configuration configuration) throws Exception {
        synchronized (this.lock) {
            initializeServices(configuration);
            configuration.setString(JobManagerOptions.ADDRESS, this.commonRpcService.getAddress());
            configuration.setInteger(JobManagerOptions.PORT, this.commonRpcService.getPort());
            startClusterComponents(configuration, this.commonRpcService, this.haServices, this.blobServer, this.heartbeatServices, this.metricRegistry);
        }
    }

    protected void initializeServices(Configuration configuration) throws Exception {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        LOG.info("Initializing cluster services.");
        this.commonRpcService = createRpcService(configuration, configuration.getString(JobManagerOptions.ADDRESS), String.valueOf(configuration.getInteger(JobManagerOptions.PORT)));
        this.haServices = createHaServices(configuration, this.commonRpcService.getExecutor());
        this.blobServer = new BlobServer(configuration, this.haServices.createBlobStore());
        this.blobServer.start();
        this.heartbeatServices = createHeartbeatServices(configuration);
        this.metricRegistry = createMetricRegistry(configuration);
        this.metricRegistry.startQueryService(((AkkaRpcService) this.commonRpcService).getActorSystem(), null);
    }

    protected RpcService createRpcService(Configuration configuration, String str, String str2) throws Exception {
        ActorSystem startActorSystem = BootstrapTools.startActorSystem(configuration, str, str2, LOG);
        FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
        return new AkkaRpcService(startActorSystem, Time.of(timeout.length(), timeout.unit()));
    }

    protected HighAvailabilityServices createHaServices(Configuration configuration, Executor executor) throws Exception {
        return HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
    }

    protected HeartbeatServices createHeartbeatServices(Configuration configuration) {
        return HeartbeatServices.fromConfiguration(configuration);
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutDown(boolean z) throws FlinkException {
        LOG.info("Stopping {}.", getClass().getSimpleName());
        Throwable th = null;
        synchronized (this.lock) {
            try {
                stopClusterComponents(z);
            } catch (Throwable th2) {
                th = ExceptionUtils.firstOrSuppressed(th2, null);
            }
            if (this.metricRegistry != null) {
                try {
                    this.metricRegistry.shutdown();
                } catch (Throwable th3) {
                    th = th3;
                }
            }
            if (this.blobServer != null) {
                try {
                    this.blobServer.close();
                } catch (Throwable th4) {
                    th = ExceptionUtils.firstOrSuppressed(th4, th);
                }
            }
            if (this.haServices != null) {
                try {
                    if (z) {
                        this.haServices.closeAndCleanupAllData();
                    } else {
                        this.haServices.close();
                    }
                } catch (Throwable th5) {
                    th = ExceptionUtils.firstOrSuppressed(th5, th);
                }
            }
            if (this.commonRpcService != null) {
                try {
                    this.commonRpcService.stopService();
                } catch (Throwable th6) {
                    th = ExceptionUtils.firstOrSuppressed(th6, th);
                }
            }
            this.terminationFuture.complete(true);
        }
        if (th != null) {
            throw new FlinkException("Could not properly shut down the cluster services.", th);
        }
    }

    @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
    public void onFatalError(Throwable th) {
        LOG.error("Fatal error occurred in the cluster entrypoint.", th);
        System.exit(2);
    }

    protected abstract void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception;

    protected void stopClusterComponents(boolean z) throws Exception {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static ClusterConfiguration parseArguments(String[] strArr) {
        return new ClusterConfiguration(ParameterTool.fromArgs(strArr).get("configDir", ""));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static Configuration loadConfiguration(ClusterConfiguration clusterConfiguration) {
        return GlobalConfiguration.loadConfiguration(clusterConfiguration.getConfigDir());
    }

    static {
        $assertionsDisabled = !ClusterEntrypoint.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ClusterEntrypoint.class);
    }
}
