package org.apache.flink.runtime.minicluster;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster.class */
public class MiniCluster {
    private static final Logger LOG;
    private final MiniClusterConfiguration miniClusterConfiguration;

    @GuardedBy("lock")
    private MetricRegistryImpl metricRegistry;

    @GuardedBy("lock")
    private RpcService commonRpcService;

    @GuardedBy("lock")
    private RpcService[] jobManagerRpcServices;

    @GuardedBy("lock")
    private RpcService[] taskManagerRpcServices;

    @GuardedBy("lock")
    private RpcService[] resourceManagerRpcServices;

    @GuardedBy("lock")
    private HighAvailabilityServices haServices;

    @GuardedBy("lock")
    private BlobServer blobServer;

    @GuardedBy("lock")
    private HeartbeatServices heartbeatServices;

    @GuardedBy("lock")
    private ResourceManagerRunner[] resourceManagerRunners;
    private volatile TaskExecutor[] taskManagers;

    @GuardedBy("lock")
    private MiniClusterJobDispatcher jobDispatcher;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();
    private volatile boolean running = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$TerminatingFatalErrorHandler.class */
    public class TerminatingFatalErrorHandler implements FatalErrorHandler {
        private final int index;

        private TerminatingFatalErrorHandler(int i) {
            this.index = i;
        }

        @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
        public void onFatalError(Throwable th) {
            if (MiniCluster.this.running) {
                MiniCluster.LOG.error("TaskManager #{} failed.", Integer.valueOf(this.index), th);
                TaskExecutor[] taskExecutorArr = MiniCluster.this.taskManagers;
                if (taskExecutorArr != null) {
                    taskExecutorArr[this.index].shutDown();
                }
            }
        }
    }

    public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
        this.miniClusterConfiguration = (MiniClusterConfiguration) Preconditions.checkNotNull(miniClusterConfiguration, "config may not be null");
    }

    public boolean isRunning() {
        return this.running;
    }

    public void start() throws Exception {
        synchronized (this.lock) {
            Preconditions.checkState(!this.running, "FlinkMiniCluster is already running");
            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", this.miniClusterConfiguration);
            UnmodifiableConfiguration configuration = this.miniClusterConfiguration.getConfiguration();
            Time rpcTimeout = this.miniClusterConfiguration.getRpcTimeout();
            int numJobManagers = this.miniClusterConfiguration.getNumJobManagers();
            int numTaskManagers = this.miniClusterConfiguration.getNumTaskManagers();
            int numResourceManagers = this.miniClusterConfiguration.getNumResourceManagers();
            boolean z = this.miniClusterConfiguration.getRpcServiceSharing() == MiniClusterConfiguration.RpcServiceSharing.SHARED;
            try {
                LOG.info("Starting Metrics Registry");
                this.metricRegistry = createMetricRegistry(configuration);
                RpcService[] rpcServiceArr = new RpcService[numJobManagers];
                RpcService[] rpcServiceArr2 = new RpcService[numTaskManagers];
                RpcService[] rpcServiceArr3 = new RpcService[numResourceManagers];
                LOG.info("Starting RPC Service(s)");
                this.commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
                this.metricRegistry.startQueryService(((AkkaRpcService) this.commonRpcService).getActorSystem(), null);
                if (z) {
                    for (int i = 0; i < numJobManagers; i++) {
                        rpcServiceArr[i] = this.commonRpcService;
                    }
                    for (int i2 = 0; i2 < numTaskManagers; i2++) {
                        rpcServiceArr2[i2] = this.commonRpcService;
                    }
                    for (int i3 = 0; i3 < numResourceManagers; i3++) {
                        rpcServiceArr3[i3] = this.commonRpcService;
                    }
                    this.resourceManagerRpcServices = null;
                    this.jobManagerRpcServices = null;
                    this.taskManagerRpcServices = null;
                } else {
                    String jobManagerBindAddress = this.miniClusterConfiguration.getJobManagerBindAddress();
                    String taskManagerBindAddress = this.miniClusterConfiguration.getTaskManagerBindAddress();
                    String resourceManagerBindAddress = this.miniClusterConfiguration.getResourceManagerBindAddress();
                    for (int i4 = 0; i4 < numJobManagers; i4++) {
                        rpcServiceArr[i4] = createRpcService(configuration, rpcTimeout, true, jobManagerBindAddress);
                    }
                    for (int i5 = 0; i5 < numTaskManagers; i5++) {
                        rpcServiceArr2[i5] = createRpcService(configuration, rpcTimeout, true, taskManagerBindAddress);
                    }
                    for (int i6 = 0; i6 < numResourceManagers; i6++) {
                        rpcServiceArr3[i6] = createRpcService(configuration, rpcTimeout, true, resourceManagerBindAddress);
                    }
                    this.jobManagerRpcServices = rpcServiceArr;
                    this.taskManagerRpcServices = rpcServiceArr2;
                    this.resourceManagerRpcServices = rpcServiceArr3;
                }
                LOG.info("Starting high-availability services");
                this.haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration, this.commonRpcService.getExecutor());
                this.blobServer = new BlobServer(configuration, this.haServices.createBlobStore());
                this.blobServer.start();
                this.heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
                LOG.info("Starting {} ResourceManger(s)", Integer.valueOf(numResourceManagers));
                this.resourceManagerRunners = startResourceManagers(configuration, this.haServices, this.heartbeatServices, this.metricRegistry, numResourceManagers, rpcServiceArr3);
                LOG.info("Starting {} TaskManger(s)", Integer.valueOf(numTaskManagers));
                this.taskManagers = startTaskManagers(configuration, this.haServices, this.metricRegistry, numTaskManagers, rpcServiceArr2);
                LOG.info("Starting job dispatcher(s) for {} JobManger(s)", Integer.valueOf(numJobManagers));
                this.jobDispatcher = new MiniClusterJobDispatcher(configuration, this.haServices, this.blobServer, this.heartbeatServices, this.metricRegistry, numJobManagers, rpcServiceArr);
                this.running = true;
                LOG.info("Flink Mini Cluster started successfully");
            } catch (Exception e) {
                try {
                    shutdownInternally();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
                throw e;
            }
        }
    }

    public void shutdown() throws Exception {
        synchronized (this.lock) {
            if (this.running) {
                LOG.info("Shutting down Flink Mini Cluster");
                try {
                    shutdownInternally();
                    this.running = false;
                    LOG.info("Flink Mini Cluster is shut down");
                } catch (Throwable th) {
                    this.running = false;
                    throw th;
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v60, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v76, types: [java.lang.Throwable] */
    @GuardedBy("lock")
    private void shutdownInternally() throws Exception {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        Exception exc = null;
        if (this.jobDispatcher != null) {
            try {
                this.jobDispatcher.shutdown();
            } catch (Exception e) {
                exc = e;
            }
            this.jobDispatcher = null;
        }
        if (this.resourceManagerRunners != null) {
            for (ResourceManagerRunner resourceManagerRunner : this.resourceManagerRunners) {
                if (resourceManagerRunner != null) {
                    try {
                        resourceManagerRunner.shutDown();
                    } catch (Throwable th) {
                        exc = ExceptionUtils.firstOrSuppressed(th, exc);
                    }
                }
            }
            this.resourceManagerRunners = null;
        }
        if (this.taskManagers != null) {
            for (TaskExecutor taskExecutor : this.taskManagers) {
                if (taskExecutor != null) {
                    try {
                        taskExecutor.shutDown();
                        taskExecutor.getTerminationFuture().get();
                    } catch (Throwable th2) {
                        exc = ExceptionUtils.firstOrSuppressed(th2, exc);
                    }
                }
            }
            this.taskManagers = null;
        }
        if (this.metricRegistry != null) {
            this.metricRegistry.shutdown();
            this.metricRegistry = null;
        }
        Throwable shutDownRpcs = shutDownRpcs(this.resourceManagerRpcServices, shutDownRpcs(this.taskManagerRpcServices, shutDownRpcs(this.jobManagerRpcServices, shutDownRpc(this.commonRpcService, exc))));
        this.commonRpcService = null;
        this.jobManagerRpcServices = null;
        this.taskManagerRpcServices = null;
        this.resourceManagerRpcServices = null;
        if (this.blobServer != null) {
            try {
                this.blobServer.close();
            } catch (Exception e2) {
                shutDownRpcs = ExceptionUtils.firstOrSuppressed(e2, shutDownRpcs);
            }
            this.blobServer = null;
        }
        if (this.haServices != null) {
            try {
                this.haServices.closeAndCleanupAllData();
            } catch (Exception e3) {
                shutDownRpcs = ExceptionUtils.firstOrSuppressed(e3, shutDownRpcs);
            }
            this.haServices = null;
        }
        if (shutDownRpcs != null) {
            ExceptionUtils.rethrowException(shutDownRpcs, "Error while shutting down mini cluster");
        }
    }

    public void waitUntilTaskManagerRegistrationsComplete() throws Exception {
        CompletableFuture<LeaderAddressAndId> future;
        LeaderRetrievalService leaderRetrievalService = null;
        try {
            synchronized (this.lock) {
                Preconditions.checkState(this.running, "FlinkMiniCluster is not running");
                OneTimeLeaderListenerFuture oneTimeLeaderListenerFuture = new OneTimeLeaderListenerFuture();
                leaderRetrievalService = this.haServices.getResourceManagerLeaderRetriever();
                leaderRetrievalService.start(oneTimeLeaderListenerFuture);
                future = oneTimeLeaderListenerFuture.future();
            }
            LeaderAddressAndId leaderAddressAndId = future.get();
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) this.commonRpcService.connect(leaderAddressAndId.leaderAddress(), new ResourceManagerId(leaderAddressAndId.leaderId()), ResourceManagerGateway.class).get();
            int length = this.taskManagers.length;
            while (resourceManagerGateway.getNumberOfRegisteredTaskManagers().get().intValue() < length) {
                Thread.sleep(2L);
            }
            if (leaderRetrievalService != null) {
                try {
                    leaderRetrievalService.stop();
                } catch (Exception e) {
                    LOG.warn("Error shutting down leader listener for ResourceManager");
                }
            }
        } catch (Throwable th) {
            if (leaderRetrievalService != null) {
                try {
                    leaderRetrievalService.stop();
                } catch (Exception e2) {
                    LOG.warn("Error shutting down leader listener for ResourceManager");
                    throw th;
                }
            }
            throw th;
        }
    }

    public void runDetached(JobGraph jobGraph) throws JobExecutionException {
        Preconditions.checkNotNull(jobGraph, "job is null");
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "mini cluster is not running");
            this.jobDispatcher.runDetached(jobGraph);
        }
    }

    public JobExecutionResult runJobBlocking(JobGraph jobGraph) throws JobExecutionException, InterruptedException {
        MiniClusterJobDispatcher miniClusterJobDispatcher;
        Preconditions.checkNotNull(jobGraph, "job is null");
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "mini cluster is not running");
            miniClusterJobDispatcher = this.jobDispatcher;
        }
        return miniClusterJobDispatcher.runJobBlocking(jobGraph);
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration));
    }

    protected RpcService createRpcService(Configuration configuration, Time time, boolean z, String str) {
        return new AkkaRpcService(z ? AkkaUtils.createActorSystem(configuration, str, 0) : AkkaUtils.createLocalActorSystem(configuration), time);
    }

    protected ResourceManagerRunner[] startResourceManagers(Configuration configuration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, int i, RpcService[] rpcServiceArr) throws Exception {
        ResourceManagerRunner[] resourceManagerRunnerArr = new ResourceManagerRunner[i];
        for (int i2 = 0; i2 < i; i2++) {
            resourceManagerRunnerArr[i2] = new ResourceManagerRunner(ResourceID.generate(), "resourcemanager_" + i2, configuration, rpcServiceArr[i2], highAvailabilityServices, heartbeatServices, metricRegistry);
            resourceManagerRunnerArr[i2].start();
        }
        return resourceManagerRunnerArr;
    }

    protected TaskExecutor[] startTaskManagers(Configuration configuration, HighAvailabilityServices highAvailabilityServices, MetricRegistry metricRegistry, int i, RpcService[] rpcServiceArr) throws Exception {
        TaskExecutor[] taskExecutorArr = new TaskExecutor[i];
        boolean z = i == 1;
        for (int i2 = 0; i2 < i; i2++) {
            taskExecutorArr[i2] = TaskManagerRunner.startTaskManager(configuration, new ResourceID(UUID.randomUUID().toString()), rpcServiceArr[i2], highAvailabilityServices, this.heartbeatServices, metricRegistry, z, new TerminatingFatalErrorHandler(i2));
            taskExecutorArr[i2].start();
        }
        return taskExecutorArr;
    }

    private static Throwable shutDownRpc(RpcService rpcService, Throwable th) {
        if (rpcService != null) {
            try {
                rpcService.stopService();
            } catch (Throwable th2) {
                return ExceptionUtils.firstOrSuppressed(th2, th);
            }
        }
        return th;
    }

    private static Throwable shutDownRpcs(RpcService[] rpcServiceArr, Throwable th) {
        if (rpcServiceArr != null) {
            Throwable th2 = th;
            for (RpcService rpcService : rpcServiceArr) {
                if (rpcService != null) {
                    try {
                        rpcService.stopService();
                    } catch (Throwable th3) {
                        th2 = ExceptionUtils.firstOrSuppressed(th3, th2);
                    }
                }
            }
        }
        return th;
    }

    static {
        $assertionsDisabled = !MiniCluster.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(MiniCluster.class);
    }
}
