package org.apache.flink.runtime.resourcemanager;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.dump.MetricQueryService;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManager.class */
public abstract class ResourceManager<WorkerType extends Serializable> extends FencedRpcEndpoint<ResourceManagerId> implements ResourceManagerGateway, LeaderContender {
    public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
    private final ResourceID resourceId;
    private final ResourceManagerConfiguration resourceManagerConfiguration;
    private final Map<JobID, JobManagerRegistration> jobManagerRegistrations;
    private final Map<ResourceID, JobManagerRegistration> jmResourceIdRegistrations;
    private final JobLeaderIdService jobLeaderIdService;
    private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
    private final HighAvailabilityServices highAvailabilityServices;
    private final HeartbeatManager<SlotReport, Void> taskManagerHeartbeatManager;
    private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
    private final MetricRegistry metricRegistry;
    private final FatalErrorHandler fatalErrorHandler;
    private final SlotManager slotManager;
    private LeaderElectionService leaderElectionService;
    private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;

    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManager$JobLeaderIdActionsImpl.class */
    private class JobLeaderIdActionsImpl implements JobLeaderIdActions {
        private JobLeaderIdActionsImpl() {
        }

        @Override // org.apache.flink.runtime.resourcemanager.JobLeaderIdActions
        public void jobLeaderLostLeadership(final JobID jobID, final JobMasterId jobMasterId) {
            ResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.ResourceManager.JobLeaderIdActionsImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    ResourceManager.this.jobLeaderLostLeadership(jobID, jobMasterId);
                }
            });
        }

        @Override // org.apache.flink.runtime.resourcemanager.JobLeaderIdActions
        public void notifyJobTimeout(final JobID jobID, final UUID uuid) {
            ResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.ResourceManager.JobLeaderIdActionsImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    if (ResourceManager.this.jobLeaderIdService.isValidTimeout(jobID, uuid)) {
                        ResourceManager.this.removeJob(jobID);
                    }
                }
            });
        }

        @Override // org.apache.flink.runtime.resourcemanager.JobLeaderIdActions
        public void handleError(Throwable th) {
            ResourceManager.this.onFatalError(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManager$JobManagerHeartbeatListener.class */
    private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
        private JobManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            ResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.ResourceManager.JobManagerHeartbeatListener.1
                @Override // java.lang.Runnable
                public void run() {
                    JobManagerRegistration jobManagerRegistration;
                    ResourceManager.this.log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
                    if (!ResourceManager.this.jmResourceIdRegistrations.containsKey(resourceID) || (jobManagerRegistration = (JobManagerRegistration) ResourceManager.this.jmResourceIdRegistrations.get(resourceID)) == null) {
                        return;
                    }
                    ResourceManager.this.closeJobManagerConnection(jobManagerRegistration.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
                }
            });
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, Void r3) {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public CompletableFuture<Void> retrievePayload() {
            return CompletableFuture.completedFuture(null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManager$ResourceActionsImpl.class */
    private class ResourceActionsImpl implements ResourceActions {
        private ResourceActionsImpl() {
        }

        @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions
        public void releaseResource(InstanceID instanceID) {
            ResourceManager.this.validateRunsInMainThread();
            ResourceManager.this.releaseResource(instanceID);
        }

        @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions
        public void allocateResource(ResourceProfile resourceProfile) throws ResourceManagerException {
            ResourceManager.this.validateRunsInMainThread();
            ResourceManager.this.startNewWorker(resourceProfile);
        }

        @Override // org.apache.flink.runtime.resourcemanager.slotmanager.ResourceActions
        public void notifyAllocationFailure(JobID jobID, AllocationID allocationID, Exception exc) {
            ResourceManager.this.validateRunsInMainThread();
            ResourceManager.this.log.info("Slot request with allocation id {} for job {} failed.", new Object[]{allocationID, jobID, exc});
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManager$TaskManagerHeartbeatListener.class */
    private class TaskManagerHeartbeatListener implements HeartbeatListener<SlotReport, Void> {
        private TaskManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            ResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.ResourceManager.TaskManagerHeartbeatListener.1
                @Override // java.lang.Runnable
                public void run() {
                    ResourceManager.this.log.info("The heartbeat of TaskManager with id {} timed out.", resourceID);
                    ResourceManager.this.closeTaskManagerConnection(resourceID, new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  timed out."));
                }
            });
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(final ResourceID resourceID, final SlotReport slotReport) {
            ResourceManager.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.ResourceManager.TaskManagerHeartbeatListener.2
                @Override // java.lang.Runnable
                public void run() {
                    ResourceManager.this.log.debug("Received new slot report from TaskManager {}.", resourceID);
                    WorkerRegistration workerRegistration = (WorkerRegistration) ResourceManager.this.taskExecutors.get(resourceID);
                    if (workerRegistration == null) {
                        ResourceManager.this.log.debug("Received slot report from TaskManager {} which is no longer registered.", resourceID);
                    } else {
                        ResourceManager.this.slotManager.reportSlotStatus(workerRegistration.getInstanceID(), slotReport);
                    }
                }
            });
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public CompletableFuture<Void> retrievePayload() {
            return CompletableFuture.completedFuture(null);
        }
    }

    public ResourceManager(RpcService rpcService, String str, ResourceID resourceID, ResourceManagerConfiguration resourceManagerConfiguration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, MetricRegistry metricRegistry, JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) {
        super(rpcService, str);
        this.resourceId = (ResourceID) Preconditions.checkNotNull(resourceID);
        this.resourceManagerConfiguration = (ResourceManagerConfiguration) Preconditions.checkNotNull(resourceManagerConfiguration);
        this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.slotManager = (SlotManager) Preconditions.checkNotNull(slotManager);
        this.metricRegistry = (MetricRegistry) Preconditions.checkNotNull(metricRegistry);
        this.jobLeaderIdService = (JobLeaderIdService) Preconditions.checkNotNull(jobLeaderIdService);
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceID, new TaskManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(resourceID, new JobManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.jobManagerRegistrations = new HashMap(4);
        this.jmResourceIdRegistrations = new HashMap(4);
        this.taskExecutors = new HashMap(8);
        this.infoMessageListeners = new ConcurrentHashMap(8);
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void start() throws Exception {
        super.start();
        this.leaderElectionService = this.highAvailabilityServices.getResourceManagerLeaderElectionService();
        try {
            this.leaderElectionService.start(this);
            try {
                this.jobLeaderIdService.start(new JobLeaderIdActionsImpl());
                initialize();
            } catch (Exception e) {
                throw new ResourceManagerException("Could not start the job leader id service.", e);
            }
        } catch (Exception e2) {
            throw new ResourceManagerException("Could not start the leader election service.", e2);
        }
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void postStop() throws Exception {
        Exception exc = null;
        this.taskManagerHeartbeatManager.stop();
        this.jobManagerHeartbeatManager.stop();
        try {
            this.slotManager.close();
        } catch (Exception e) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e, null);
        }
        try {
            this.leaderElectionService.stop();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        try {
            this.jobLeaderIdService.stop();
        } catch (Exception e3) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
        }
        clearState();
        try {
            super.postStop();
        } catch (Exception e4) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e4, exc);
        }
        if (exc != null) {
            ExceptionUtils.rethrowException(exc, "Error while shutting the ResourceManager down.");
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID resourceID, String str, JobID jobID, Time time) {
        Preconditions.checkNotNull(jobMasterId);
        Preconditions.checkNotNull(resourceID);
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(jobID);
        if (!this.jobLeaderIdService.containsJob(jobID)) {
            try {
                this.jobLeaderIdService.addJob(jobID);
            } catch (Exception e) {
                ResourceManagerException resourceManagerException = new ResourceManagerException("Could not add the job " + jobID + " to the job id leader service.", e);
                onFatalError(resourceManagerException);
                this.log.error("Could not add job {} to job leader id service.", jobID, e);
                return FutureUtils.completedExceptionally(resourceManagerException);
            }
        }
        this.log.info("Registering job manager {}@{} for job {}.", new Object[]{jobMasterId, str, jobID});
        try {
            return getRpcService().connect(str, jobMasterId, JobMasterGateway.class).thenCombineAsync((CompletionStage) this.jobLeaderIdService.getLeaderId(jobID), (jobMasterGateway, jobMasterId2) -> {
                if (Objects.equals(jobMasterId2, jobMasterId)) {
                    return registerJobMasterInternal(jobMasterGateway, jobID, str, resourceID);
                }
                this.log.debug("The current JobMaster leader id {} did not match the received JobMaster id {}.", jobMasterId, jobMasterId2);
                return new RegistrationResponse.Decline("Job manager leader id did not match.");
            }, (Executor) getMainThreadExecutor()).handleAsync((registrationResponse, th) -> {
                if (th == null) {
                    return registrationResponse;
                }
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Registration of job manager {}@{} failed.", new Object[]{jobMasterId, str, th});
                } else {
                    this.log.info("Registration of job manager {}@{} failed.", jobMasterId, str);
                }
                return new RegistrationResponse.Decline(th.getMessage());
            }, getRpcService().getExecutor());
        } catch (Exception e2) {
            ResourceManagerException resourceManagerException2 = new ResourceManagerException("Cannot obtain the job leader id future to verify the correct job leader.", e2);
            onFatalError(resourceManagerException2);
            this.log.debug("Could not obtain the job leader id future to verify the correct job leader.");
            return FutureUtils.completedExceptionally(resourceManagerException2);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public CompletableFuture<RegistrationResponse> registerTaskExecutor(String str, ResourceID resourceID, SlotReport slotReport, Time time) {
        return getRpcService().connect(str, TaskExecutorGateway.class).handleAsync((taskExecutorGateway, th) -> {
            return th != null ? new RegistrationResponse.Decline(th.getMessage()) : registerTaskExecutorInternal(taskExecutorGateway, str, resourceID, slotReport);
        }, (Executor) getMainThreadExecutor());
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public void heartbeatFromTaskManager(ResourceID resourceID, SlotReport slotReport) {
        this.taskManagerHeartbeatManager.receiveHeartbeat(resourceID, slotReport);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public void heartbeatFromJobManager(ResourceID resourceID) {
        this.jobManagerHeartbeatManager.receiveHeartbeat(resourceID, null);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public void disconnectTaskManager(ResourceID resourceID, Exception exc) {
        closeTaskManagerConnection(resourceID, exc);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public void disconnectJobManager(JobID jobID, Exception exc) {
        closeJobManagerConnection(jobID, exc);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public CompletableFuture<Acknowledge> requestSlot(JobMasterId jobMasterId, SlotRequest slotRequest, Time time) {
        JobID jobId = slotRequest.getJobId();
        JobManagerRegistration jobManagerRegistration = this.jobManagerRegistrations.get(jobId);
        if (null == jobManagerRegistration) {
            return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
        }
        if (!Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
            return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " + jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
        }
        this.log.info("Request slot with profile {} for job {} with allocation id {}.", new Object[]{slotRequest.getResourceProfile(), slotRequest.getJobId(), slotRequest.getAllocationId()});
        try {
            this.slotManager.registerSlotRequest(slotRequest);
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (SlotManagerException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public void notifySlotAvailable(InstanceID instanceID, SlotID slotID, AllocationID allocationID) {
        ResourceID resourceID = slotID.getResourceID();
        WorkerRegistration<WorkerType> workerRegistration = this.taskExecutors.get(resourceID);
        if (workerRegistration == null) {
            this.log.debug("Could not find registration for resource id {}. Discarding the slot availablemessage {}.", resourceID, slotID);
        } else if (Objects.equals(workerRegistration.getInstanceID(), instanceID)) {
            this.slotManager.freeSlot(slotID, allocationID);
        } else {
            this.log.debug("Invalid registration id for slot available message. This indicates an outdated request.");
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public void registerInfoMessageListener(String str) {
        if (this.infoMessageListeners.containsKey(str)) {
            this.log.warn("Receive a duplicate registration from info message listener on ({})", str);
        } else {
            getRpcService().connect(str, InfoMessageListenerRpcGateway.class).whenCompleteAsync((infoMessageListenerRpcGateway, th) -> {
                if (th != null) {
                    this.log.warn("Receive a registration from unreachable info message listener on ({})", str);
                } else {
                    this.log.info("Receive a registration from info message listener on ({})", str);
                    this.infoMessageListeners.put(str, infoMessageListenerRpcGateway);
                }
            }, (Executor) getMainThreadExecutor());
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public void unRegisterInfoMessageListener(String str) {
        this.infoMessageListeners.remove(str);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public void shutDownCluster(ApplicationStatus applicationStatus, String str) {
        this.log.info("Shut down cluster because application is in {}, diagnostics {}.", applicationStatus, str);
        try {
            shutDownApplication(applicationStatus, str);
        } catch (ResourceManagerException e) {
            this.log.warn("Could not properly shutdown the application.", e);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
        return CompletableFuture.completedFuture(Integer.valueOf(this.taskExecutors.size()));
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public CompletableFuture<ResourceOverview> requestResourceOverview(Time time) {
        return CompletableFuture.completedFuture(new ResourceOverview(this.taskExecutors.size(), this.slotManager.getNumberRegisteredSlots(), this.slotManager.getNumberFreeSlots()));
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManagerGateway
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time time) {
        ArrayList arrayList = new ArrayList(this.taskExecutors.size());
        for (Map.Entry<ResourceID, WorkerRegistration<WorkerType>> entry : this.taskExecutors.entrySet()) {
            ResourceID key = entry.getKey();
            String address = entry.getValue().getTaskExecutorGateway().getAddress();
            arrayList.add(Tuple2.of(key, address.substring(0, address.lastIndexOf(47) + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + key.getResourceIdString()));
        }
        return CompletableFuture.completedFuture(arrayList);
    }

    private RegistrationResponse registerJobMasterInternal(final JobMasterGateway jobMasterGateway, JobID jobID, String str, ResourceID resourceID) {
        if (this.jobManagerRegistrations.containsKey(jobID)) {
            JobManagerRegistration jobManagerRegistration = this.jobManagerRegistrations.get(jobID);
            if (Objects.equals(jobManagerRegistration.getJobMasterId(), jobMasterGateway.getFencingToken())) {
                this.log.debug("Job manager {}@{} was already registered.", jobMasterGateway.getFencingToken(), str);
            } else {
                disconnectJobManager(jobManagerRegistration.getJobID(), new Exception("New job leader for job " + jobID + " found."));
                JobManagerRegistration jobManagerRegistration2 = new JobManagerRegistration(jobID, resourceID, jobMasterGateway);
                this.jobManagerRegistrations.put(jobID, jobManagerRegistration2);
                this.jmResourceIdRegistrations.put(resourceID, jobManagerRegistration2);
            }
        } else {
            JobManagerRegistration jobManagerRegistration3 = new JobManagerRegistration(jobID, resourceID, jobMasterGateway);
            this.jobManagerRegistrations.put(jobID, jobManagerRegistration3);
            this.jmResourceIdRegistrations.put(resourceID, jobManagerRegistration3);
        }
        this.log.info("Registered job manager {}@{} for job {}.", new Object[]{jobMasterGateway.getFencingToken(), str, jobID});
        this.jobManagerHeartbeatManager.monitorTarget(resourceID, new HeartbeatTarget<Void>() { // from class: org.apache.flink.runtime.resourcemanager.ResourceManager.1
            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void receiveHeartbeat(ResourceID resourceID2, Void r3) {
            }

            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void requestHeartbeat(ResourceID resourceID2, Void r5) {
                jobMasterGateway.heartbeatFromResourceManager(resourceID2);
            }
        });
        return new JobMasterRegistrationSuccess(this.resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(), getFencingToken(), this.resourceId);
    }

    private RegistrationResponse registerTaskExecutorInternal(final TaskExecutorGateway taskExecutorGateway, String str, ResourceID resourceID, SlotReport slotReport) {
        WorkerRegistration<WorkerType> remove = this.taskExecutors.remove(resourceID);
        if (remove != null) {
            this.log.info("Replacing old instance of worker for ResourceID {}", resourceID);
            this.slotManager.unregisterTaskManager(remove.getInstanceID());
        }
        WorkerType workerStarted = workerStarted(resourceID);
        if (workerStarted == null) {
            this.log.warn("Discard registration from TaskExecutor {} at ({}) because the framework did not recognize it", resourceID, str);
            return new RegistrationResponse.Decline("unrecognized TaskExecutor");
        }
        WorkerRegistration<WorkerType> workerRegistration = new WorkerRegistration<>(taskExecutorGateway, workerStarted);
        this.taskExecutors.put(resourceID, workerRegistration);
        this.slotManager.registerTaskManager(workerRegistration, slotReport);
        this.taskManagerHeartbeatManager.monitorTarget(resourceID, new HeartbeatTarget<Void>() { // from class: org.apache.flink.runtime.resourcemanager.ResourceManager.2
            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void receiveHeartbeat(ResourceID resourceID2, Void r3) {
            }

            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void requestHeartbeat(ResourceID resourceID2, Void r5) {
                taskExecutorGateway.heartbeatFromResourceManager(resourceID2);
            }
        });
        return new TaskExecutorRegistrationSuccess(workerRegistration.getInstanceID(), this.resourceId, this.resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds());
    }

    private void clearState() {
        this.jobManagerRegistrations.clear();
        this.jmResourceIdRegistrations.clear();
        this.taskExecutors.clear();
        try {
            this.jobLeaderIdService.clear();
        } catch (Exception e) {
            onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e));
        }
    }

    protected void closeJobManagerConnection(JobID jobID, Exception exc) {
        JobManagerRegistration remove = this.jobManagerRegistrations.remove(jobID);
        if (remove == null) {
            this.log.debug("There was no registered job manager for job {}.", jobID);
            return;
        }
        ResourceID jobManagerResourceID = remove.getJobManagerResourceID();
        JobMasterGateway jobManagerGateway = remove.getJobManagerGateway();
        this.log.info("Disconnect job manager {}@{} for job {} from the resource manager.", new Object[]{remove.getJobMasterId(), jobManagerGateway.getAddress(), jobID});
        this.jobManagerHeartbeatManager.unmonitorTarget(jobManagerResourceID);
        this.jmResourceIdRegistrations.remove(jobManagerResourceID);
        jobManagerGateway.disconnectResourceManager(getFencingToken(), exc);
    }

    protected void closeTaskManagerConnection(ResourceID resourceID, Exception exc) {
        this.taskManagerHeartbeatManager.unmonitorTarget(resourceID);
        WorkerRegistration<WorkerType> remove = this.taskExecutors.remove(resourceID);
        if (remove == null) {
            this.log.debug("Could not find a registered task manager with the process id {}.", resourceID);
            return;
        }
        this.log.info("Task manager {} failed because {}.", resourceID, exc.getMessage());
        this.slotManager.unregisterTaskManager(remove.getInstanceID());
        remove.getTaskExecutorGateway().disconnectResourceManager(exc);
    }

    protected void removeJob(JobID jobID) {
        try {
            this.jobLeaderIdService.removeJob(jobID);
        } catch (Exception e) {
            this.log.warn("Could not properly remove the job {} from the job leader id service.", jobID, e);
        }
        if (this.jobManagerRegistrations.containsKey(jobID)) {
            disconnectJobManager(jobID, new Exception("Job " + jobID + "was removed"));
        }
    }

    protected void jobLeaderLostLeadership(JobID jobID, JobMasterId jobMasterId) {
        if (!this.jobManagerRegistrations.containsKey(jobID)) {
            this.log.debug("Discard job leader lost leadership for outdated leader {} for job {}.", jobMasterId, jobID);
        } else if (Objects.equals(this.jobManagerRegistrations.get(jobID).getJobMasterId(), jobMasterId)) {
            disconnectJobManager(jobID, new Exception("Job leader lost leadership."));
        } else {
            this.log.debug("Discarding job leader lost leadership, because a new job leader was found for job {}. ", jobID);
        }
    }

    protected void releaseResource(InstanceID instanceID) {
        ResourceID resourceID = null;
        Iterator<Map.Entry<ResourceID, WorkerRegistration<WorkerType>>> it = this.taskExecutors.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<ResourceID, WorkerRegistration<WorkerType>> next = it.next();
            if (next.getValue().getInstanceID().equals(instanceID)) {
                resourceID = next.getKey();
                break;
            }
        }
        if (resourceID == null) {
            this.slotManager.unregisterTaskManager(instanceID);
        } else if (stopWorker(resourceID)) {
            closeTaskManagerConnection(resourceID, new FlinkException("Worker was stopped."));
        } else {
            this.log.debug("Worker {} was not stopped.", resourceID);
        }
    }

    public void sendInfoMessage(final String str) {
        getRpcService().execute(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.ResourceManager.3
            @Override // java.lang.Runnable
            public void run() {
                InfoMessage infoMessage = new InfoMessage(str);
                Iterator it = ResourceManager.this.infoMessageListeners.values().iterator();
                while (it.hasNext()) {
                    ((InfoMessageListenerRpcGateway) it.next()).notifyInfoMessage(infoMessage);
                }
            }
        });
    }

    protected void onFatalError(Throwable th) {
        try {
            this.log.error("Fatal error occurred in ResourceManager.", th);
        } catch (Throwable th2) {
        }
        this.fatalErrorHandler.onFatalError(th);
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void grantLeadership(UUID uuid) {
        runAsyncWithoutFencing(() -> {
            ResourceManagerId resourceManagerId = new ResourceManagerId(uuid);
            this.log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), resourceManagerId);
            if (getFencingToken() != null) {
                clearState();
            }
            setFencingToken(resourceManagerId);
            this.slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
            getRpcService().execute(() -> {
                this.leaderElectionService.confirmLeaderSessionID(uuid);
            });
        });
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void revokeLeadership() {
        runAsyncWithoutFencing(() -> {
            this.log.info("ResourceManager {} was revoked leadership. Clearing fencing token.", getAddress());
            clearState();
            setFencingToken(null);
            this.slotManager.suspend();
        });
    }

    @Override // org.apache.flink.runtime.leaderelection.LeaderContender
    public void handleError(Exception exc) {
        onFatalError(new ResourceManagerException("Received an error from the LeaderElectionService.", exc));
    }

    protected abstract void initialize() throws ResourceManagerException;

    protected abstract void shutDownApplication(ApplicationStatus applicationStatus, String str) throws ResourceManagerException;

    @VisibleForTesting
    public abstract void startNewWorker(ResourceProfile resourceProfile);

    protected abstract WorkerType workerStarted(ResourceID resourceID);

    public abstract boolean stopWorker(ResourceID resourceID);
}
