package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.dispatch.Futures;
import java.util.concurrent.Callable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/client/JobAttachmentClientActor.class */
public class JobAttachmentClientActor extends JobClientActor {
    private JobID jobID;
    private boolean successfullyRegisteredForJob;

    public JobAttachmentClientActor(LeaderRetrievalService leaderRetrievalService, FiniteDuration finiteDuration, boolean z) {
        super(leaderRetrievalService, finiteDuration, z);
        this.successfullyRegisteredForJob = false;
    }

    @Override // org.apache.flink.runtime.client.JobClientActor
    public void connectedToJobManager() {
        if (this.jobID == null || this.successfullyRegisteredForJob) {
            return;
        }
        tryToAttachToJob();
    }

    @Override // org.apache.flink.runtime.client.JobClientActor
    protected Class getClientMessageClass() {
        return JobClientMessages.AttachToJobAndWait.class;
    }

    @Override // org.apache.flink.runtime.client.JobClientActor
    public void handleCustomMessage(Object obj) {
        if (obj instanceof JobClientMessages.AttachToJobAndWait) {
            if (this.client != null) {
                this.LOG.error("Received repeated 'AttachToJobAndWait'");
                getSender().tell(decorateMessage(new Status.Failure(new Exception("Received repeated 'AttachToJobAndWait'"))), ActorRef.noSender());
                terminate();
                return;
            }
            this.jobID = ((JobClientMessages.AttachToJobAndWait) obj).jobID();
            if (this.jobID == null) {
                this.LOG.error("Received null JobID");
                sender().tell(decorateMessage(new Status.Failure(new Exception("JobID is null"))), getSelf());
                return;
            }
            this.LOG.info("Received JobID {}.", this.jobID);
            this.client = getSender();
            if (this.jobManager != null) {
                tryToAttachToJob();
                return;
            }
            return;
        }
        if (obj instanceof JobManagerMessages.RegisterJobClientSuccess) {
            logAndPrintMessage("Successfully registered at the JobManager for Job " + ((JobManagerMessages.RegisterJobClientSuccess) obj).jobId());
            this.successfullyRegisteredForJob = true;
            return;
        }
        if (obj instanceof JobManagerMessages.JobNotFound) {
            this.LOG.info("Couldn't register JobClient for JobID {}", ((JobManagerMessages.JobNotFound) obj).jobID());
            this.client.tell(decorateMessage(obj), getSelf());
            terminate();
        } else if (!JobClientMessages.getRegistrationTimeout().equals(obj)) {
            this.LOG.error("{} received unknown message: ", getClass());
        } else {
            if (this.successfullyRegisteredForJob) {
                return;
            }
            if (isClientConnected()) {
                this.client.tell(decorateMessage(new Status.Failure(new JobClientActorRegistrationTimeoutException("Registration for Job at the JobManager timed out. You may increase '" + AkkaOptions.CLIENT_TIMEOUT.key() + "' in case the JobManager needs more time to confirm the job client registration."))), getSelf());
            }
            terminate();
        }
    }

    private void tryToAttachToJob() {
        this.LOG.info("Sending message to JobManager {} to attach to job {} and wait for progress", this.jobManager, this.jobID);
        Futures.future(new Callable<Object>() { // from class: org.apache.flink.runtime.client.JobAttachmentClientActor.1
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                JobAttachmentClientActor.this.LOG.info("Attaching to job {} at the job manager {}.", JobAttachmentClientActor.this.jobID, JobAttachmentClientActor.this.jobManager.path());
                JobAttachmentClientActor.this.jobManager.tell(JobAttachmentClientActor.this.decorateMessage(new JobManagerMessages.RegisterJobClient(JobAttachmentClientActor.this.jobID, ListeningBehaviour.EXECUTION_RESULT_AND_STATE_CHANGES)), JobAttachmentClientActor.this.getSelf());
                JobAttachmentClientActor.this.getContext().system().scheduler().scheduleOnce(JobAttachmentClientActor.this.timeout, JobAttachmentClientActor.this.getSelf(), JobAttachmentClientActor.this.decorateMessage(JobClientMessages.getRegistrationTimeout()), JobAttachmentClientActor.this.getContext().dispatcher(), ActorRef.noSender());
                return null;
            }
        }, getContext().dispatcher());
    }

    public static Props createActorProps(LeaderRetrievalService leaderRetrievalService, FiniteDuration finiteDuration, boolean z) {
        return Props.create(JobAttachmentClientActor.class, new Object[]{leaderRetrievalService, finiteDuration, Boolean.valueOf(z)});
    }
}
