package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.BlobCache;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.AcceptFunction;
import org.apache.flink.runtime.concurrent.ApplyFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.PartitionInfo;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.registration.RegistrationConnectionListener;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcMethod;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlot;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor.class */
public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
    public static final String TASK_MANAGER_NAME = "taskmanager";
    private final TaskManagerLocation taskManagerLocation;
    public static final int MAX_BLOB_PORT = 65536;
    private final HighAvailabilityServices haServices;
    private final TaskManagerConfiguration taskManagerConfiguration;
    private final IOManager ioManager;
    private final MemoryManager memoryManager;
    private final NetworkEnvironment networkEnvironment;
    private final MetricRegistry metricRegistry;
    private final HeartbeatManager<Void, Void> jobManagerHeartbeatManager;
    private final HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;
    private final FatalErrorHandler fatalErrorHandler;
    private final TaskManagerMetricGroup taskManagerMetricGroup;
    private final BroadcastVariableManager broadcastVariableManager;
    private final FileCache fileCache;
    private TaskExecutorToResourceManagerConnection resourceManagerConnection;
    private Map<ResourceID, JobManagerConnection> jobManagerConnections;
    private final TaskSlotTable taskSlotTable;
    private final JobManagerTable jobManagerTable;
    private final JobLeaderService jobLeaderService;

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$JobLeaderListenerImpl.class */
    private final class JobLeaderListenerImpl implements JobLeaderListener {
        private JobLeaderListenerImpl() {
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void jobManagerGainedLeadership(final JobID jobID, final JobMasterGateway jobMasterGateway, final UUID uuid, final JMTMRegistrationSuccess jMTMRegistrationSuccess) {
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.JobLeaderListenerImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    TaskExecutor.this.establishJobManagerConnection(jobID, jobMasterGateway, uuid, jMTMRegistrationSuccess);
                }
            });
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void jobManagerLostLeadership(final JobID jobID, UUID uuid) {
            TaskExecutor.this.log.info("JobManager for job {} with leader id {} lost leadership.", jobID, uuid);
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.JobLeaderListenerImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    TaskExecutor.this.closeJobManagerConnection(jobID, new Exception("Job leader for job id " + jobID + " lost leadership."));
                }
            });
        }

        @Override // org.apache.flink.runtime.taskexecutor.JobLeaderListener
        public void handleError(Throwable th) {
            TaskExecutor.this.onFatalErrorAsync(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$JobManagerHeartbeatListener.class */
    private class JobManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
        private JobManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.JobManagerHeartbeatListener.1
                @Override // java.lang.Runnable
                public void run() {
                    JobManagerConnection jobManagerConnection;
                    TaskExecutor.this.log.info("The heartbeat of JobManager with id {} timed out.", resourceID);
                    if (!TaskExecutor.this.jobManagerConnections.containsKey(resourceID) || (jobManagerConnection = (JobManagerConnection) TaskExecutor.this.jobManagerConnections.get(resourceID)) == null) {
                        return;
                    }
                    TaskExecutor.this.closeJobManagerConnection(jobManagerConnection.getJobID(), new TimeoutException("The heartbeat of JobManager with id " + resourceID + " timed out."));
                }
            });
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, Void r3) {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public Future<Void> retrievePayload() {
            return FlinkCompletableFuture.completed(null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerHeartbeatListener.class */
    private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, Void> {
        private ResourceManagerHeartbeatListener() {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void notifyHeartbeatTimeout(final ResourceID resourceID) {
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerHeartbeatListener.1
                @Override // java.lang.Runnable
                public void run() {
                    TaskExecutor.this.log.info("The heartbeat of ResourceManager with id {} timed out.", resourceID);
                    TaskExecutor.this.closeResourceManagerConnection(new TimeoutException("The heartbeat of ResourceManager with id " + resourceID + " timed out."));
                }
            });
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public void reportPayload(ResourceID resourceID, Void r3) {
        }

        @Override // org.apache.flink.runtime.heartbeat.HeartbeatListener
        public Future<Void> retrievePayload() {
            return FlinkCompletableFuture.completed(null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerLeaderListener.class */
    private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
        private ResourceManagerLeaderListener() {
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void notifyLeaderAddress(final String str, final UUID uuid) {
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerLeaderListener.1
                @Override // java.lang.Runnable
                public void run() {
                    TaskExecutor.this.notifyOfNewResourceManagerLeader(str, uuid);
                }
            });
        }

        @Override // org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener
        public void handleError(Exception exc) {
            TaskExecutor.this.onFatalErrorAsync(exc);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$ResourceManagerRegistrationListener.class */
    public final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorRegistrationSuccess> {
        private ResourceManagerRegistrationListener() {
        }

        @Override // org.apache.flink.runtime.registration.RegistrationConnectionListener
        public void onRegistrationSuccess(TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess) {
            final ResourceID resourceManagerId = taskExecutorRegistrationSuccess.getResourceManagerId();
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.ResourceManagerRegistrationListener.1
                @Override // java.lang.Runnable
                public void run() {
                    TaskExecutor.this.establishResourceManagerConnection(resourceManagerId);
                }
            });
        }

        @Override // org.apache.flink.runtime.registration.RegistrationConnectionListener
        public void onRegistrationFailure(Throwable th) {
            TaskExecutor.this.onFatalErrorAsync(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$SlotActionsImpl.class */
    private class SlotActionsImpl implements SlotActions {
        private SlotActionsImpl() {
        }

        @Override // org.apache.flink.runtime.taskexecutor.slot.SlotActions
        public void freeSlot(final AllocationID allocationID) {
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.SlotActionsImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    TaskExecutor.this.freeSlot(allocationID);
                }
            });
        }

        @Override // org.apache.flink.runtime.taskexecutor.slot.SlotActions
        public void timeoutSlot(final AllocationID allocationID, final UUID uuid) {
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.SlotActionsImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    TaskExecutor.this.timeoutSlot(allocationID, uuid);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutor$TaskManagerActionsImpl.class */
    public final class TaskManagerActionsImpl implements TaskManagerActions {
        private final UUID jobMasterLeaderId;
        private final JobMasterGateway jobMasterGateway;

        private TaskManagerActionsImpl(UUID uuid, JobMasterGateway jobMasterGateway) {
            this.jobMasterLeaderId = (UUID) Preconditions.checkNotNull(uuid);
            this.jobMasterGateway = (JobMasterGateway) Preconditions.checkNotNull(jobMasterGateway);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void notifyFinalState(final ExecutionAttemptID executionAttemptID) {
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.TaskManagerActionsImpl.1
                @Override // java.lang.Runnable
                public void run() {
                    TaskExecutor.this.unregisterTaskAndNotifyFinalState(TaskManagerActionsImpl.this.jobMasterLeaderId, TaskManagerActionsImpl.this.jobMasterGateway, executionAttemptID);
                }
            });
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void notifyFatalError(String str, Throwable th) {
            TaskExecutor.this.log.error(str, th);
            TaskExecutor.this.fatalErrorHandler.onFatalError(th);
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void failTask(final ExecutionAttemptID executionAttemptID, final Throwable th) {
            TaskExecutor.this.runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.TaskManagerActionsImpl.2
                @Override // java.lang.Runnable
                public void run() {
                    TaskExecutor.this.failTask(executionAttemptID, th);
                }
            });
        }

        @Override // org.apache.flink.runtime.taskmanager.TaskManagerActions
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            TaskExecutor.this.updateTaskExecutionState(this.jobMasterLeaderId, this.jobMasterGateway, taskExecutionState);
        }
    }

    public TaskExecutor(RpcService rpcService, TaskManagerConfiguration taskManagerConfiguration, TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager iOManager, NetworkEnvironment networkEnvironment, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, TaskManagerMetricGroup taskManagerMetricGroup, BroadcastVariableManager broadcastVariableManager, FileCache fileCache, TaskSlotTable taskSlotTable, JobManagerTable jobManagerTable, JobLeaderService jobLeaderService, FatalErrorHandler fatalErrorHandler) {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
        Preconditions.checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has to be larger than 0.");
        this.taskManagerConfiguration = (TaskManagerConfiguration) Preconditions.checkNotNull(taskManagerConfiguration);
        this.taskManagerLocation = (TaskManagerLocation) Preconditions.checkNotNull(taskManagerLocation);
        this.memoryManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.networkEnvironment = (NetworkEnvironment) Preconditions.checkNotNull(networkEnvironment);
        this.haServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.metricRegistry = (MetricRegistry) Preconditions.checkNotNull(metricRegistry);
        this.taskSlotTable = (TaskSlotTable) Preconditions.checkNotNull(taskSlotTable);
        this.fatalErrorHandler = (FatalErrorHandler) Preconditions.checkNotNull(fatalErrorHandler);
        this.taskManagerMetricGroup = (TaskManagerMetricGroup) Preconditions.checkNotNull(taskManagerMetricGroup);
        this.broadcastVariableManager = (BroadcastVariableManager) Preconditions.checkNotNull(broadcastVariableManager);
        this.fileCache = (FileCache) Preconditions.checkNotNull(fileCache);
        this.jobManagerTable = (JobManagerTable) Preconditions.checkNotNull(jobManagerTable);
        this.jobLeaderService = (JobLeaderService) Preconditions.checkNotNull(jobLeaderService);
        this.jobManagerConnections = new HashMap(4);
        this.jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(getResourceID(), new JobManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
        this.resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(getResourceID(), new ResourceManagerHeartbeatListener(), rpcService.getScheduledExecutor(), this.log);
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void start() throws Exception {
        super.start();
        try {
            this.haServices.getResourceManagerLeaderRetriever().start(new ResourceManagerLeaderListener());
        } catch (Exception e) {
            onFatalErrorAsync(e);
        }
        this.taskSlotTable.start(new SlotActionsImpl());
        this.jobLeaderService.start(getAddress(), getRpcService(), this.haServices, new JobLeaderListenerImpl());
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void shutDown() throws Exception {
        this.log.info("Stopping TaskManager {}.", getAddress());
        Exception exc = null;
        this.taskSlotTable.stop();
        if (isConnectedToResourceManager()) {
            this.resourceManagerConnection.close();
        }
        this.jobManagerHeartbeatManager.stop();
        this.resourceManagerHeartbeatManager.stop();
        this.ioManager.shutdown();
        this.memoryManager.shutdown();
        this.networkEnvironment.shutdown();
        this.fileCache.shutdown();
        try {
            super.shutDown();
        } catch (Exception e) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e, null);
        }
        if (exc != null) {
            ExceptionUtils.rethrowException(exc, "Error while shutting the TaskExecutor down.");
        }
        this.log.info("Stopped TaskManager {}.", getAddress());
    }

    @RpcMethod
    public Acknowledge submitTask(TaskDeploymentDescriptor taskDeploymentDescriptor, UUID uuid) throws TaskSubmissionException {
        try {
            JobInformation deserializeValue = taskDeploymentDescriptor.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
            TaskInformation deserializeValue2 = taskDeploymentDescriptor.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
            JobID jobId = deserializeValue.getJobId();
            JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobId);
            if (jobManagerConnection == null) {
                String str = "Could not submit task because there is no JobManager associated for the job " + jobId + '.';
                this.log.debug(str);
                throw new TaskSubmissionException(str);
            }
            if (!Objects.equals(jobManagerConnection.getLeaderId(), uuid)) {
                String str2 = "Rejecting the task submission because the job manager leader id " + uuid + " does not match the expected job manager leader id " + jobManagerConnection.getLeaderId() + '.';
                this.log.debug(str2);
                throw new TaskSubmissionException(str2);
            }
            if (!this.taskSlotTable.existsActiveSlot(jobId, taskDeploymentDescriptor.getAllocationId())) {
                String str3 = "No task slot allocated for job ID " + jobId + " and allocation ID " + taskDeploymentDescriptor.getAllocationId() + '.';
                this.log.debug(str3);
                throw new TaskSubmissionException(str3);
            }
            TaskMetricGroup addTaskForJob = this.taskManagerMetricGroup.addTaskForJob(deserializeValue.getJobId(), deserializeValue.getJobName(), deserializeValue2.getJobVertexId(), taskDeploymentDescriptor.getExecutionAttemptId(), deserializeValue2.getTaskName(), taskDeploymentDescriptor.getSubtaskIndex(), taskDeploymentDescriptor.getAttemptNumber());
            Task task = new Task(deserializeValue, deserializeValue2, taskDeploymentDescriptor.getExecutionAttemptId(), taskDeploymentDescriptor.getAllocationId(), taskDeploymentDescriptor.getSubtaskIndex(), taskDeploymentDescriptor.getAttemptNumber(), taskDeploymentDescriptor.getProducedPartitions(), taskDeploymentDescriptor.getInputGates(), taskDeploymentDescriptor.getTargetSlotNumber(), taskDeploymentDescriptor.getTaskStateHandles(), this.memoryManager, this.ioManager, this.networkEnvironment, this.broadcastVariableManager, jobManagerConnection.getTaskManagerActions(), new RpcInputSplitProvider(jobManagerConnection.getLeaderId(), jobManagerConnection.getJobManagerGateway(), deserializeValue.getJobId(), deserializeValue2.getJobVertexId(), taskDeploymentDescriptor.getExecutionAttemptId(), this.taskManagerConfiguration.getTimeout()), jobManagerConnection.getCheckpointResponder(), jobManagerConnection.getLibraryCacheManager(), this.fileCache, this.taskManagerConfiguration, addTaskForJob, jobManagerConnection.getResultPartitionConsumableNotifier(), jobManagerConnection.getPartitionStateChecker(), getRpcService().getExecutor());
            this.log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());
            try {
                if (this.taskSlotTable.addTask(task)) {
                    task.startTaskThread();
                    return Acknowledge.get();
                }
                String str4 = "TaskManager already contains a task for id " + task.getExecutionId() + '.';
                this.log.debug(str4);
                throw new TaskSubmissionException(str4);
            } catch (SlotNotActiveException | SlotNotFoundException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }
        } catch (IOException | ClassNotFoundException e2) {
            throw new TaskSubmissionException("Could not deserialize the job or task information.", e2);
        }
    }

    @RpcMethod
    public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) throws TaskException {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            String str = "Cannot find task to stop for execution " + executionAttemptID + '.';
            this.log.debug(str);
            throw new TaskException(str);
        }
        try {
            task.cancelExecution();
            return Acknowledge.get();
        } catch (Throwable th) {
            throw new TaskException("Cannot cancel task for execution " + executionAttemptID + '.', th);
        }
    }

    @RpcMethod
    public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) throws TaskException {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            String str = "Cannot find task to stop for execution " + executionAttemptID + '.';
            this.log.debug(str);
            throw new TaskException(str);
        }
        try {
            task.stopExecution();
            return Acknowledge.get();
        } catch (Throwable th) {
            throw new TaskException("Cannot stop task for execution " + executionAttemptID + '.', th);
        }
    }

    @RpcMethod
    public Acknowledge updatePartitions(final ExecutionAttemptID executionAttemptID, Iterable<PartitionInfo> iterable) throws PartitionException {
        final Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            this.log.debug("Discard update for input partitions of task {}. Task is no longer running.", executionAttemptID);
            return Acknowledge.get();
        }
        for (final PartitionInfo partitionInfo : iterable) {
            IntermediateDataSetID intermediateDataSetID = partitionInfo.getIntermediateDataSetID();
            final SingleInputGate inputGateById = task.getInputGateById(intermediateDataSetID);
            if (inputGateById == null) {
                throw new PartitionException("No reader with ID " + intermediateDataSetID + " for task " + executionAttemptID + " was found.");
            }
            getRpcService().execute(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        inputGateById.updateInputChannel(partitionInfo.getInputChannelDeploymentDescriptor());
                    } catch (IOException | InterruptedException e) {
                        TaskExecutor.this.log.error("Could not update input data location for task {}. Trying to fail task.", task.getTaskInfo().getTaskName(), e);
                        try {
                            task.failExternally(e);
                        } catch (RuntimeException e2) {
                            TaskExecutor.this.log.error("Failed canceling task with execution ID {} after task update failure.", executionAttemptID, e2);
                        }
                    }
                }
            });
        }
        return Acknowledge.get();
    }

    @RpcMethod
    public void failPartition(ExecutionAttemptID executionAttemptID) {
        this.log.info("Discarding the results produced by task execution {}.", executionAttemptID);
        try {
            this.networkEnvironment.getResultPartitionManager().releasePartitionsProducedBy(executionAttemptID);
        } catch (Throwable th) {
            onFatalError(th);
        }
    }

    @RpcMethod
    public void heartbeatFromJobManager(ResourceID resourceID) {
        this.jobManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @RpcMethod
    public void heartbeatFromResourceManager(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
    }

    @RpcMethod
    public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long j, long j2, CheckpointOptions checkpointOptions) throws CheckpointException {
        this.log.debug("Trigger checkpoint {}@{} for {}.", new Object[]{Long.valueOf(j), Long.valueOf(j2), executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.triggerCheckpointBarrier(j, j2, checkpointOptions);
            return Acknowledge.get();
        }
        String str = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
        this.log.debug(str);
        throw new CheckpointException(str);
    }

    @RpcMethod
    public Acknowledge confirmCheckpoint(ExecutionAttemptID executionAttemptID, long j, long j2) throws CheckpointException {
        this.log.debug("Confirm checkpoint {}@{} for {}.", new Object[]{Long.valueOf(j), Long.valueOf(j2), executionAttemptID});
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task != null) {
            task.notifyCheckpointComplete(j);
            return Acknowledge.get();
        }
        String str = "TaskManager received a checkpoint confirmation for unknown task " + executionAttemptID + '.';
        this.log.debug(str);
        throw new CheckpointException(str);
    }

    @RpcMethod
    public Acknowledge requestSlot(SlotID slotID, JobID jobID, AllocationID allocationID, String str, UUID uuid) throws SlotAllocationException {
        this.log.info("Receive slot request {} for job {} from resource manager with leader id {}.", new Object[]{allocationID, jobID, uuid});
        if (this.resourceManagerConnection == null) {
            this.log.debug("TaskManager is not connected to a resource manager.");
            throw new SlotAllocationException("TaskManager is not connected to a resource manager.");
        }
        if (!this.resourceManagerConnection.getTargetLeaderId().equals(uuid)) {
            String str2 = "The leader id " + uuid + " does not match with the leader id of the connected resource manager " + this.resourceManagerConnection.getTargetLeaderId() + '.';
            this.log.debug(str2);
            throw new SlotAllocationException(str2);
        }
        if (this.taskSlotTable.isSlotFree(slotID.getSlotNumber())) {
            if (!this.taskSlotTable.allocateSlot(slotID.getSlotNumber(), jobID, allocationID, this.taskManagerConfiguration.getTimeout())) {
                this.log.info("Could not allocate slot for {}.", allocationID);
                throw new SlotAllocationException("Could not allocate slot.");
            }
            this.log.info("Allocated slot for {}.", allocationID);
        } else if (!this.taskSlotTable.isAllocated(slotID.getSlotNumber(), jobID, allocationID)) {
            String str3 = "The slot " + slotID + " has already been allocated for a different job.";
            this.log.info(str3);
            throw new SlotOccupiedException(str3, this.taskSlotTable.getCurrentAllocation(slotID.getSlotNumber()));
        }
        if (this.jobManagerTable.contains(jobID)) {
            offerSlotsToJobManager(jobID);
        } else {
            try {
                this.jobLeaderService.addJob(jobID, str);
            } catch (Exception e) {
                try {
                    this.taskSlotTable.freeSlot(allocationID);
                } catch (SlotNotFoundException e2) {
                    onFatalError(e2);
                }
                if (!this.taskSlotTable.isSlotFree(slotID.getSlotNumber())) {
                    onFatalError(new Exception("Could not free slot " + slotID));
                }
                throw new SlotAllocationException("Could not add job to job leader service.", e);
            }
        }
        return Acknowledge.get();
    }

    @RpcMethod
    public void disconnectJobManager(JobID jobID, Exception exc) {
        closeJobManagerConnection(jobID, exc);
    }

    @RpcMethod
    public void disconnectResourceManager(Exception exc) {
        closeResourceManagerConnection(exc);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyOfNewResourceManagerLeader(String str, UUID uuid) {
        if (this.resourceManagerConnection != null) {
            if (str != null) {
                this.log.info("ResourceManager leader changed from {} to {}. Registering at new leader.", this.resourceManagerConnection.getTargetAddress(), str);
            } else {
                this.log.info("Current ResourceManager {} lost leader status. Waiting for new ResourceManager leader.", this.resourceManagerConnection.getTargetAddress());
            }
            if (this.resourceManagerConnection != null) {
                this.resourceManagerConnection.close();
                this.resourceManagerConnection = null;
            }
        }
        if (str != null) {
            this.log.info("Attempting to register at ResourceManager {}", str);
            this.resourceManagerConnection = new TaskExecutorToResourceManagerConnection(this.log, getRpcService(), getAddress(), getResourceID(), this.taskSlotTable.createSlotReport(getResourceID()), str, uuid, getMainThreadExecutor(), new ResourceManagerRegistrationListener());
            this.resourceManagerConnection.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void establishResourceManagerConnection(ResourceID resourceID) {
        this.resourceManagerHeartbeatManager.monitorTarget(resourceID, new HeartbeatTarget<Void>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.2
            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void receiveHeartbeat(ResourceID resourceID2, Void r5) {
                TaskExecutor.this.resourceManagerConnection.getTargetGateway().heartbeatFromTaskManager(resourceID2);
            }

            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void requestHeartbeat(ResourceID resourceID2, Void r3) {
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeResourceManagerConnection(Exception exc) {
        if (isConnectedToResourceManager()) {
            this.log.info("Close ResourceManager connection {}.", this.resourceManagerConnection.getResourceManagerId(), exc);
            this.resourceManagerHeartbeatManager.unmonitorTarget(this.resourceManagerConnection.getResourceManagerId());
            this.resourceManagerConnection.getTargetGateway().disconnectTaskManager(getResourceID(), exc);
            this.resourceManagerConnection.close();
            this.resourceManagerConnection = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void offerSlotsToJobManager(final JobID jobID) {
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobID);
        if (jobManagerConnection == null) {
            this.log.debug("There is no job manager connection to the leader of job {}.", jobID);
            return;
        }
        if (!this.taskSlotTable.hasAllocatedSlots(jobID)) {
            this.log.debug("There are no unassigned slots for the job {}.", jobID);
            return;
        }
        this.log.info("Offer reserved slots to the leader of job {}.", jobID);
        JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
        Iterator<TaskSlot> allocatedSlots = this.taskSlotTable.getAllocatedSlots(jobID);
        final UUID leaderId = jobManagerConnection.getLeaderId();
        final HashSet hashSet = new HashSet(2);
        while (allocatedSlots.hasNext()) {
            SlotOffer generateSlotOffer = allocatedSlots.next().generateSlotOffer();
            try {
                if (!this.taskSlotTable.markSlotActive(generateSlotOffer.getAllocationId())) {
                    String str = "Could not mark slot " + jobID + " active.";
                    this.log.debug(str);
                    jobManagerGateway.failSlot(getResourceID(), generateSlotOffer.getAllocationId(), leaderId, new Exception(str));
                }
                hashSet.add(generateSlotOffer);
            } catch (SlotNotFoundException e) {
                jobManagerGateway.failSlot(getResourceID(), generateSlotOffer.getAllocationId(), leaderId, new Exception("Could not mark slot " + jobID + " active."));
            }
        }
        jobManagerGateway.offerSlots(getResourceID(), hashSet, leaderId, this.taskManagerConfiguration.getTimeout()).thenAcceptAsync(new AcceptFunction<Iterable<SlotOffer>>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.3
            @Override // org.apache.flink.runtime.concurrent.AcceptFunction
            public void accept(Iterable<SlotOffer> iterable) {
                if (!TaskExecutor.this.isJobManagerConnectionValid(jobID, leaderId)) {
                    TaskExecutor.this.log.debug("Discard offer slot response since there is a new leader for the job {}.", jobID);
                    return;
                }
                Iterator<SlotOffer> it = iterable.iterator();
                while (it.hasNext()) {
                    hashSet.remove(it.next());
                }
                Exception exc = new Exception("The slot was rejected by the JobManager.");
                Iterator it2 = hashSet.iterator();
                while (it2.hasNext()) {
                    TaskExecutor.this.freeSlot(((SlotOffer) it2.next()).getAllocationId(), exc);
                }
            }
        }, getMainThreadExecutor()).exceptionally(new ApplyFunction<Throwable, Void>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.4
            @Override // org.apache.flink.runtime.concurrent.ApplyFunction
            public Void apply(Throwable th) {
                if (th instanceof TimeoutException) {
                    TaskExecutor.this.log.info("Slot offering to JobManager did not finish in time. Retrying the slot offering.");
                    TaskExecutor.this.offerSlotsToJobManager(jobID);
                    return null;
                }
                TaskExecutor.this.log.warn("Slot offering to JobManager failed. Freeing the slots and returning them to the ResourceManager.", th);
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    TaskExecutor.this.freeSlot(((SlotOffer) it.next()).getAllocationId(), th);
                }
                return null;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void establishJobManagerConnection(JobID jobID, final JobMasterGateway jobMasterGateway, UUID uuid, JMTMRegistrationSuccess jMTMRegistrationSuccess) {
        this.log.info("Establish JobManager connection for job {}.", jobID);
        if (this.jobManagerTable.contains(jobID) && !this.jobManagerTable.get(jobID).getLeaderId().equals(uuid)) {
            closeJobManagerConnection(jobID, new Exception("Found new job leader for job id " + jobID + '.'));
        }
        ResourceID resourceID = jMTMRegistrationSuccess.getResourceID();
        JobManagerConnection associateWithJobManager = associateWithJobManager(jobID, resourceID, jobMasterGateway, uuid, jMTMRegistrationSuccess.getBlobPort());
        this.jobManagerConnections.put(resourceID, associateWithJobManager);
        this.jobManagerTable.put(jobID, associateWithJobManager);
        this.jobManagerHeartbeatManager.monitorTarget(resourceID, new HeartbeatTarget<Void>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.5
            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void receiveHeartbeat(ResourceID resourceID2, Void r5) {
                jobMasterGateway.heartbeatFromTaskManager(resourceID2);
            }

            @Override // org.apache.flink.runtime.heartbeat.HeartbeatTarget
            public void requestHeartbeat(ResourceID resourceID2, Void r3) {
            }
        });
        offerSlotsToJobManager(jobID);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeJobManagerConnection(JobID jobID, Exception exc) {
        this.log.info("Close JobManager connection for job {}.", jobID);
        Iterator<Task> tasks = this.taskSlotTable.getTasks(jobID);
        while (tasks.hasNext()) {
            tasks.next().failExternally(new Exception("JobManager responsible for " + jobID + " lost the leadership."));
        }
        Iterator<AllocationID> activeSlots = this.taskSlotTable.getActiveSlots(jobID);
        while (activeSlots.hasNext()) {
            AllocationID next = activeSlots.next();
            try {
                if (!this.taskSlotTable.markSlotInactive(next, this.taskManagerConfiguration.getTimeout())) {
                    freeSlot(next, new Exception("Slot could not be marked inactive."));
                }
            } catch (SlotNotFoundException e) {
                this.log.debug("Could not mark the slot {} inactive.", jobID, e);
            }
        }
        JobManagerConnection remove = this.jobManagerTable.remove(jobID);
        if (remove != null) {
            try {
                this.jobManagerHeartbeatManager.unmonitorTarget(remove.getResourceID());
                this.jobManagerConnections.remove(remove.getResourceID());
                disassociateFromJobManager(remove, exc);
            } catch (IOException e2) {
                this.log.warn("Could not properly disassociate from JobManager {}.", remove.getJobManagerGateway().getAddress(), e2);
            }
        }
    }

    private JobManagerConnection associateWithJobManager(JobID jobID, ResourceID resourceID, JobMasterGateway jobMasterGateway, UUID uuid, int i) {
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(resourceID);
        Preconditions.checkNotNull(uuid);
        Preconditions.checkNotNull(jobMasterGateway);
        Preconditions.checkArgument(i > 0 || i < 65536, "Blob server port is out of range.");
        try {
            return new JobManagerConnection(jobID, resourceID, jobMasterGateway, uuid, new TaskManagerActionsImpl(uuid, jobMasterGateway), new RpcCheckpointResponder(jobMasterGateway), new BlobLibraryCacheManager(new BlobCache(new InetSocketAddress(jobMasterGateway.getHostname(), i), this.taskManagerConfiguration.getConfiguration(), this.haServices.createBlobStore()), this.taskManagerConfiguration.getCleanupInterval()), new RpcResultPartitionConsumableNotifier(uuid, jobMasterGateway, getRpcService().getExecutor(), this.taskManagerConfiguration.getTimeout()), new RpcPartitionStateChecker(uuid, jobMasterGateway));
        } catch (IOException e) {
            this.log.error("Could not create BLOB cache or library cache.", e);
            throw new RuntimeException("Could not create BLOB cache or library cache.", e);
        }
    }

    private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception exc) throws IOException {
        Preconditions.checkNotNull(jobManagerConnection);
        jobManagerConnection.getJobManagerGateway().disconnectTaskManager(getResourceID(), exc);
        jobManagerConnection.getLibraryCacheManager().shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failTask(ExecutionAttemptID executionAttemptID, Throwable th) {
        Task task = this.taskSlotTable.getTask(executionAttemptID);
        if (task == null) {
            this.log.debug("Cannot find task to fail for execution {}.", executionAttemptID);
            return;
        }
        try {
            task.failExternally(th);
        } catch (Throwable th2) {
            this.log.error("Could not fail task {}.", executionAttemptID, th2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateTaskExecutionState(UUID uuid, JobMasterGateway jobMasterGateway, TaskExecutionState taskExecutionState) {
        final ExecutionAttemptID id = taskExecutionState.getID();
        jobMasterGateway.updateTaskExecutionState(uuid, taskExecutionState).exceptionallyAsync(new ApplyFunction<Throwable, Void>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.6
            @Override // org.apache.flink.runtime.concurrent.ApplyFunction
            public Void apply(Throwable th) {
                TaskExecutor.this.failTask(id, th);
                return null;
            }
        }, getMainThreadExecutor());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void unregisterTaskAndNotifyFinalState(UUID uuid, JobMasterGateway jobMasterGateway, ExecutionAttemptID executionAttemptID) {
        Task removeTask = this.taskSlotTable.removeTask(executionAttemptID);
        if (removeTask == null) {
            this.log.error("Cannot find task with ID {} to unregister.", executionAttemptID);
            return;
        }
        if (!removeTask.getExecutionState().isTerminal()) {
            try {
                removeTask.failExternally(new IllegalStateException("Task is being remove from TaskManager."));
            } catch (Exception e) {
                this.log.error("Could not properly fail task.", e);
            }
        }
        this.log.info("Un-registering task and sending final execution state {} to JobManager for task {} {}.", new Object[]{removeTask.getExecutionState(), removeTask.getTaskInfo().getTaskName(), removeTask.getExecutionId()});
        updateTaskExecutionState(uuid, jobMasterGateway, new TaskExecutionState(removeTask.getJobID(), removeTask.getExecutionId(), removeTask.getExecutionState(), removeTask.getFailureCause(), removeTask.getAccumulatorRegistry().getSnapshot(), removeTask.getMetricGroup().getIOMetricGroup().createSnapshot()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void freeSlot(AllocationID allocationID, Throwable th) {
        Preconditions.checkNotNull(allocationID);
        try {
            int freeSlot = this.taskSlotTable.freeSlot(allocationID, th);
            if (freeSlot != -1 && isConnectedToResourceManager()) {
                this.resourceManagerConnection.getTargetGateway().notifySlotAvailable(this.resourceManagerConnection.getTargetLeaderId(), this.resourceManagerConnection.getRegistrationId(), new SlotID(getResourceID(), freeSlot), allocationID);
            }
        } catch (SlotNotFoundException e) {
            this.log.debug("Could not free slot for allocation id {}.", allocationID, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void freeSlot(AllocationID allocationID) {
        freeSlot(allocationID, new Exception("The slot " + allocationID + " is beeing freed."));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void timeoutSlot(AllocationID allocationID, UUID uuid) {
        Preconditions.checkNotNull(allocationID);
        Preconditions.checkNotNull(uuid);
        if (this.taskSlotTable.isValidTimeout(allocationID, uuid)) {
            freeSlot(allocationID, new Exception("The slot " + allocationID + " has timed out."));
        } else {
            this.log.debug("Received an invalid timeout for allocation id {} with ticket {}.", allocationID, uuid);
        }
    }

    private boolean isConnectedToResourceManager() {
        return this.resourceManagerConnection != null && this.resourceManagerConnection.isConnected();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isJobManagerConnectionValid(JobID jobID, UUID uuid) {
        JobManagerConnection jobManagerConnection = this.jobManagerTable.get(jobID);
        return jobManagerConnection != null && Objects.equals(jobManagerConnection.getLeaderId(), uuid);
    }

    public ResourceID getResourceID() {
        return this.taskManagerLocation.getResourceID();
    }

    void onFatalErrorAsync(final Throwable th) {
        runAsync(new Runnable() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutor.7
            @Override // java.lang.Runnable
            public void run() {
                TaskExecutor.this.onFatalError(th);
            }
        });
    }

    void onFatalError(Throwable th) {
        this.log.error("Fatal error occurred.", th);
        this.fatalErrorHandler.onFatalError(th);
    }

    @VisibleForTesting
    TaskExecutorToResourceManagerConnection getResourceManagerConnection() {
        return this.resourceManagerConnection;
    }
}
