package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.PoisonPill;
import akka.actor.Status;
import akka.actor.Terminated;
import akka.dispatch.OnSuccess;
import java.util.Objects;
import java.util.UUID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.FlinkUntypedActor;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.Preconditions;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/client/JobClientActor.class */
public abstract class JobClientActor extends FlinkUntypedActor implements LeaderRetrievalListener {
    private final LeaderRetrievalService leaderRetrievalService;
    protected final FiniteDuration timeout;
    private final boolean sysoutUpdates;
    protected ActorRef client;
    private boolean toBeTerminated = false;
    protected ActorRef jobManager = ActorRef.noSender();
    protected UUID leaderSessionID = null;
    private Cancellable connectionTimeout = null;
    private UUID connectionTimeoutId = null;

    public JobClientActor(LeaderRetrievalService leaderRetrievalService, FiniteDuration finiteDuration, boolean z) {
        this.leaderRetrievalService = (LeaderRetrievalService) Preconditions.checkNotNull(leaderRetrievalService);
        this.timeout = (FiniteDuration) Preconditions.checkNotNull(finiteDuration);
        this.sysoutUpdates = z;
    }

    @Override // akka.actor.UntypedActor, akka.actor.Actor
    public void preStart() {
        try {
            this.leaderRetrievalService.start(this);
        } catch (Exception e) {
            this.LOG.error("Could not start the leader retrieval service.");
            throw new RuntimeException("Could not start the leader retrieval service.", e);
        }
    }

    @Override // akka.actor.UntypedActor, akka.actor.Actor
    public void postStop() {
        try {
            this.leaderRetrievalService.stop();
        } catch (Exception e) {
            this.LOG.warn("Could not properly stop the leader retrieval service.");
        }
    }

    protected abstract void connectedToJobManager();

    protected abstract void handleCustomMessage(Object obj);

    protected abstract Class getClientMessageClass();

    @Override // org.apache.flink.runtime.akka.FlinkUntypedActor
    protected void handleMessage(Object obj) {
        if (obj instanceof ExecutionGraphMessages.ExecutionStateChanged) {
            logAndPrintMessage((ExecutionGraphMessages.ExecutionStateChanged) obj);
            return;
        }
        if (obj instanceof ExecutionGraphMessages.JobStatusChanged) {
            logAndPrintMessage((ExecutionGraphMessages.JobStatusChanged) obj);
            return;
        }
        if (obj instanceof JobClientMessages.JobManagerLeaderAddress) {
            JobClientMessages.JobManagerLeaderAddress jobManagerLeaderAddress = (JobClientMessages.JobManagerLeaderAddress) obj;
            if (this.jobManager != null) {
                logAndPrintMessage("New JobManager elected. Connecting to " + jobManagerLeaderAddress.address());
            }
            disconnectFromJobManager();
            this.leaderSessionID = jobManagerLeaderAddress.leaderSessionID();
            if (jobManagerLeaderAddress.address() != null) {
                AkkaUtils.getActorRefFuture(jobManagerLeaderAddress.address(), getContext().system(), this.timeout).onSuccess(new OnSuccess<ActorRef>() { // from class: org.apache.flink.runtime.client.JobClientActor.1
                    @Override // akka.dispatch.OnSuccess
                    public void onSuccess(ActorRef actorRef) throws Throwable {
                        JobClientActor.this.getSelf().tell(JobClientActor.this.decorateMessage(new JobClientMessages.JobManagerActorRef(actorRef)), ActorRef.noSender());
                    }
                }, getContext().dispatcher());
                return;
            } else {
                if (isClientConnected() && this.connectionTimeoutId == null) {
                    registerConnectionTimeout();
                    return;
                }
                return;
            }
        }
        if (obj instanceof JobClientMessages.JobManagerActorRef) {
            JobClientMessages.JobManagerActorRef jobManagerActorRef = (JobClientMessages.JobManagerActorRef) obj;
            connectToJobManager(jobManagerActorRef.jobManager());
            logAndPrintMessage("Connected to JobManager at " + jobManagerActorRef.jobManager() + " with leader session id " + this.leaderSessionID + '.');
            connectedToJobManager();
            return;
        }
        if (obj instanceof JobManagerMessages.JobResultMessage) {
            if (this.LOG.isDebugEnabled()) {
                this.LOG.debug("Received {} message from JobManager", obj.getClass().getSimpleName());
            }
            if (isClientConnected()) {
                this.client.tell(decorateMessage(obj), getSelf());
            }
            terminate();
            return;
        }
        if (obj instanceof Terminated) {
            ActorRef actor = ((Terminated) obj).getActor();
            if (!this.jobManager.equals(actor)) {
                this.LOG.warn("Received 'Terminated' for unknown actor " + actor);
                return;
            }
            this.LOG.info("Lost connection to JobManager {}. Triggering connection timeout.", this.jobManager.path());
            disconnectFromJobManager();
            if (isClientConnected() && this.connectionTimeoutId == null) {
                registerConnectionTimeout();
                return;
            }
            return;
        }
        if (obj instanceof JobClientMessages.ConnectionTimeout) {
            if (!Objects.equals(this.connectionTimeoutId, ((JobClientMessages.ConnectionTimeout) obj).id())) {
                this.LOG.debug("Received outdated connection timeout.");
                return;
            } else {
                if (isJobManagerConnected()) {
                    return;
                }
                Object decorateMessage = decorateMessage(new Status.Failure(new JobClientActorConnectionTimeoutException("Lost connection to the JobManager.")));
                if (isClientConnected()) {
                    this.client.tell(decorateMessage, getSelf());
                }
                terminate();
                return;
            }
        }
        if (!isJobManagerConnected() && getClientMessageClass().equals(obj.getClass())) {
            this.LOG.info("Received {} but there is no connection to a JobManager yet.", obj);
            if (this.connectionTimeoutId == null) {
                registerConnectionTimeout();
            }
            handleCustomMessage(obj);
            return;
        }
        if (!this.toBeTerminated) {
            handleCustomMessage(obj);
            return;
        }
        String str = getClass().getName() + " is about to be terminated. Therefore, the job submission cannot be executed.";
        this.LOG.error(str);
        getSender().tell(decorateMessage(new Status.Failure(new Exception(str))), ActorRef.noSender());
    }

    @Override // org.apache.flink.runtime.akka.FlinkUntypedActor
    protected UUID getLeaderSessionID() {
        return this.leaderSessionID;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void logAndPrintMessage(String str) {
        this.LOG.info(str);
        if (this.sysoutUpdates) {
            System.out.println(str);
        }
    }

    private void logAndPrintMessage(ExecutionGraphMessages.ExecutionStateChanged executionStateChanged) {
        this.LOG.info(executionStateChanged.toString());
        if (this.sysoutUpdates) {
            System.out.println(executionStateChanged.toString());
        }
    }

    private void logAndPrintMessage(ExecutionGraphMessages.JobStatusChanged jobStatusChanged) {
        if (jobStatusChanged.newJobStatus() != JobStatus.FAILING || jobStatusChanged.error() == null) {
            this.LOG.info(jobStatusChanged.toString());
            if (this.sysoutUpdates) {
                System.out.println(jobStatusChanged.toString());
                return;
            }
            return;
        }
        this.LOG.info(jobStatusChanged.toString(), SerializedThrowable.get(jobStatusChanged.error(), getClass().getClassLoader()));
        if (this.sysoutUpdates) {
            System.out.println(jobStatusChanged.toString());
            jobStatusChanged.error().printStackTrace(System.out);
        }
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
    public void notifyLeaderAddress(String str, UUID uuid) {
        getSelf().tell(decorateMessage(new JobClientMessages.JobManagerLeaderAddress(str, uuid)), getSelf());
    }

    @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
    public void handleError(Exception exc) {
        this.LOG.error("Error occurred in the LeaderRetrievalService.", exc);
        getSelf().tell(decorateMessage(PoisonPill.getInstance()), getSelf());
    }

    private void disconnectFromJobManager() {
        this.LOG.info("Disconnect from JobManager {}.", this.jobManager);
        if (this.jobManager != ActorRef.noSender()) {
            getContext().unwatch(this.jobManager);
            this.jobManager = ActorRef.noSender();
        }
        this.leaderSessionID = null;
    }

    private void connectToJobManager(ActorRef actorRef) {
        this.LOG.info("Connect to JobManager {}.", actorRef);
        if (actorRef != ActorRef.noSender()) {
            getContext().unwatch(actorRef);
        }
        this.jobManager = actorRef;
        getContext().watch(actorRef);
        unregisterConnectionTimeout();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void terminate() {
        this.LOG.info("Terminate JobClientActor.");
        this.toBeTerminated = true;
        disconnectFromJobManager();
        getSelf().tell(decorateMessage(PoisonPill.getInstance()), ActorRef.noSender());
    }

    private boolean isJobManagerConnected() {
        return this.jobManager != ActorRef.noSender();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isClientConnected() {
        return this.client != ActorRef.noSender();
    }

    private void registerConnectionTimeout() {
        if (this.connectionTimeout != null) {
            this.connectionTimeout.cancel();
        }
        this.connectionTimeoutId = UUID.randomUUID();
        this.connectionTimeout = getContext().system().scheduler().scheduleOnce(this.timeout, getSelf(), decorateMessage(new JobClientMessages.ConnectionTimeout(this.connectionTimeoutId)), getContext().dispatcher(), ActorRef.noSender());
    }

    private void unregisterConnectionTimeout() {
        if (this.connectionTimeout != null) {
            this.connectionTimeout.cancel();
            this.connectionTimeout = null;
            this.connectionTimeoutId = null;
        }
    }
}
