package org.apache.flink.runtime.taskexecutor;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.registration.RegisteredRpcConnection;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistration;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/JobLeaderService.class */
public class JobLeaderService {
    private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class);
    private final TaskManagerLocation ownLocation;
    private final Map<JobID, Tuple2<LeaderRetrievalService, JobManagerLeaderListener>> jobLeaderServices = new HashMap(4);
    private volatile State state = State.CREATED;
    private String ownerAddress = null;
    private RpcService rpcService = null;
    private HighAvailabilityServices highAvailabilityServices = null;
    private JobLeaderListener jobLeaderListener = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/JobLeaderService$JobManagerLeaderListener.class */
    public final class JobManagerLeaderListener implements LeaderRetrievalListener {
        private final JobID jobId;
        private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
        private volatile boolean stopped;
        private volatile JobMasterId currentJobMasterId;

        /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/JobLeaderService$JobManagerLeaderListener$JobManagerRegisteredRpcConnection.class */
        private final class JobManagerRegisteredRpcConnection extends RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> {
            JobManagerRegisteredRpcConnection(Logger logger, String str, JobMasterId jobMasterId, Executor executor) {
                super(logger, str, jobMasterId, executor);
            }

            @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
            protected RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() {
                return new JobManagerRetryingRegistration(JobLeaderService.LOG, JobLeaderService.this.rpcService, "JobManager", JobMasterGateway.class, getTargetAddress(), getTargetLeaderId(), JobLeaderService.this.ownerAddress, JobLeaderService.this.ownLocation);
            }

            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
            public void onRegistrationSuccess(JMTMRegistrationSuccess jMTMRegistrationSuccess) {
                if (!Objects.equals(getTargetLeaderId(), JobManagerLeaderListener.this.currentJobMasterId)) {
                    this.log.debug("Encountered obsolete JobManager registration success from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId());
                } else {
                    this.log.info("Successful registration at job manager {} for job {}.", getTargetAddress(), JobManagerLeaderListener.this.jobId);
                    JobLeaderService.this.jobLeaderListener.jobManagerGainedLeadership(JobManagerLeaderListener.this.jobId, getTargetGateway(), jMTMRegistrationSuccess);
                }
            }

            @Override // org.apache.flink.runtime.registration.RegisteredRpcConnection
            protected void onRegistrationFailure(Throwable th) {
                if (!Objects.equals(getTargetLeaderId(), JobManagerLeaderListener.this.currentJobMasterId)) {
                    this.log.debug("Obsolete JobManager registration failure from {} with leader session ID {}.", new Object[]{getTargetAddress(), getTargetLeaderId(), th});
                } else {
                    this.log.info("Failed to register at job  manager {} for job {}.", getTargetAddress(), JobManagerLeaderListener.this.jobId);
                    JobLeaderService.this.jobLeaderListener.handleError(th);
                }
            }
        }

        private JobManagerLeaderListener(JobID jobID) {
            this.jobId = (JobID) Preconditions.checkNotNull(jobID);
            this.stopped = false;
            this.rpcConnection = null;
            this.currentJobMasterId = null;
        }

        public void stop() {
            this.stopped = true;
            if (this.rpcConnection != null) {
                this.rpcConnection.close();
            }
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void notifyLeaderAddress(String str, UUID uuid) {
            if (this.stopped) {
                JobLeaderService.LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. However, the service is no longer running.", JobLeaderService.class.getSimpleName(), this.jobId);
                return;
            }
            JobMasterId jobMasterId = uuid != null ? new JobMasterId(uuid) : null;
            JobLeaderService.LOG.debug("New leader information for job {}. Address: {}, leader id: {}.", new Object[]{this.jobId, str, jobMasterId});
            if (str == null || str.isEmpty()) {
                if (this.rpcConnection != null) {
                    this.rpcConnection.close();
                }
                JobLeaderService.this.jobLeaderListener.jobManagerLostLeadership(this.jobId, this.currentJobMasterId);
                this.currentJobMasterId = jobMasterId;
                return;
            }
            this.currentJobMasterId = jobMasterId;
            if (this.rpcConnection == null) {
                this.rpcConnection = new JobManagerRegisteredRpcConnection(JobLeaderService.LOG, str, jobMasterId, JobLeaderService.this.rpcService.getExecutor());
            } else if (!Objects.equals(jobMasterId, this.rpcConnection.getTargetLeaderId())) {
                this.rpcConnection.close();
                this.rpcConnection = new JobManagerRegisteredRpcConnection(JobLeaderService.LOG, str, jobMasterId, JobLeaderService.this.rpcService.getExecutor());
            }
            if (this.stopped) {
                this.rpcConnection.close();
            } else {
                JobLeaderService.LOG.info("Try to register at job manager {} with leader id {}.", str, uuid);
                this.rpcConnection.start();
            }
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void handleError(Exception exc) {
            if (this.stopped) {
                JobLeaderService.LOG.debug("{}'s leader retrieval listener reported an exception for job {}. However, the service is no longer running.", new Object[]{JobLeaderService.class.getSimpleName(), this.jobId, exc});
            } else {
                JobLeaderService.this.jobLeaderListener.handleError(exc);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/JobLeaderService$JobManagerRetryingRegistration.class */
    private static final class JobManagerRetryingRegistration extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> {
        private final String taskManagerRpcAddress;
        private final TaskManagerLocation taskManagerLocation;

        JobManagerRetryingRegistration(Logger logger, RpcService rpcService, String str, Class<JobMasterGateway> cls, String str2, JobMasterId jobMasterId, String str3, TaskManagerLocation taskManagerLocation) {
            super(logger, rpcService, str, cls, str2, jobMasterId);
            this.taskManagerRpcAddress = str3;
            this.taskManagerLocation = (TaskManagerLocation) Preconditions.checkNotNull(taskManagerLocation);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.flink.runtime.registration.RetryingRegistration
        public CompletableFuture<RegistrationResponse> invokeRegistration(JobMasterGateway jobMasterGateway, JobMasterId jobMasterId, long j) throws Exception {
            return jobMasterGateway.registerTaskManager(this.taskManagerRpcAddress, this.taskManagerLocation, Time.milliseconds(j));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/JobLeaderService$State.class */
    private enum State {
        CREATED,
        STARTED,
        STOPPED
    }

    public JobLeaderService(TaskManagerLocation taskManagerLocation) {
        this.ownLocation = (TaskManagerLocation) Preconditions.checkNotNull(taskManagerLocation);
    }

    public void start(String str, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, JobLeaderListener jobLeaderListener) {
        if (State.CREATED != this.state) {
            throw new IllegalStateException("The service has already been started.");
        }
        LOG.info("Start job leader service.");
        this.ownerAddress = (String) Preconditions.checkNotNull(str);
        this.rpcService = (RpcService) Preconditions.checkNotNull(rpcService);
        this.highAvailabilityServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.jobLeaderListener = (JobLeaderListener) Preconditions.checkNotNull(jobLeaderListener);
        this.state = State.STARTED;
    }

    public void stop() throws Exception {
        LOG.info("Stop job leader service.");
        if (State.STARTED == this.state) {
            for (Tuple2<LeaderRetrievalService, JobManagerLeaderListener> tuple2 : this.jobLeaderServices.values()) {
                LeaderRetrievalService leaderRetrievalService = tuple2.f0;
                tuple2.f1.stop();
                leaderRetrievalService.stop();
            }
            this.jobLeaderServices.clear();
        }
        this.state = State.STOPPED;
    }

    public boolean containsJob(JobID jobID) {
        Preconditions.checkState(State.STARTED == this.state, "The service is currently not running.");
        return this.jobLeaderServices.containsKey(jobID);
    }

    public void removeJob(JobID jobID) throws Exception {
        Preconditions.checkState(State.STARTED == this.state, "The service is currently not running.");
        Tuple2<LeaderRetrievalService, JobManagerLeaderListener> remove = this.jobLeaderServices.remove(jobID);
        if (remove != null) {
            LOG.info("Remove job {} from job leader monitoring.", jobID);
            LeaderRetrievalService leaderRetrievalService = remove.f0;
            JobManagerLeaderListener jobManagerLeaderListener = remove.f1;
            leaderRetrievalService.stop();
            jobManagerLeaderListener.stop();
        }
    }

    public void addJob(JobID jobID, String str) throws Exception {
        Preconditions.checkState(State.STARTED == this.state, "The service is currently not running.");
        LOG.info("Add job {} for job leader monitoring.", jobID);
        LeaderRetrievalService jobManagerLeaderRetriever = this.highAvailabilityServices.getJobManagerLeaderRetriever(jobID, str);
        JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobID);
        jobManagerLeaderRetriever.start(jobManagerLeaderListener);
        this.jobLeaderServices.put(jobID, Tuple2.of(jobManagerLeaderRetriever, jobManagerLeaderListener));
    }
}
