package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.SafetyNetCloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionMetrics;
import org.apache.flink.runtime.io.network.partition.consumer.InputGateMetrics;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.WrappingRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task.class */
public class Task implements Runnable, TaskActions {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
    private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
    private final JobID jobId;
    private final JobVertexID vertexId;
    private final ExecutionAttemptID executionId;
    private final AllocationID allocationId;
    private final TaskInfo taskInfo;
    private final String taskNameWithSubtask;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final Collection<BlobKey> requiredJarFiles;
    private final Collection<URL> requiredClasspaths;
    private final String nameOfInvokableClass;
    private final TaskManagerRuntimeInfo taskManagerConfig;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final BroadcastVariableManager broadcastVariableManager;
    private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
    private final ResultPartition[] producedPartitions;
    private final ResultPartitionWriter[] writers;
    private final SingleInputGate[] inputGates;
    private final Map<IntermediateDataSetID, SingleInputGate> inputGatesById;
    private final TaskManagerActions taskManagerActions;
    private final InputSplitProvider inputSplitProvider;
    private final CheckpointResponder checkpointResponder;
    private final List<TaskExecutionStateListener> taskExecutionStateListeners;
    private final LibraryCacheManager libraryCache;
    private final FileCache fileCache;
    private final NetworkEnvironment network;
    private final AccumulatorRegistry accumulatorRegistry;
    private final Thread executingThread;
    private final TaskMetricGroup metrics;
    private final PartitionProducerStateChecker partitionProducerStateChecker;
    private final Executor executor;
    private final AtomicBoolean invokableHasBeenCanceled;
    private volatile AbstractInvokable invokable;
    private volatile ExecutionState executionState = ExecutionState.CREATED;
    private volatile Throwable failureCause;
    private volatile ExecutorService asyncCallDispatcher;
    private volatile TaskStateHandles taskStateHandles;
    private long taskCancellationInterval;
    private long taskCancellationTimeout;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$TaskCanceler.class */
    public static class TaskCanceler implements Runnable {
        private final Logger logger;
        private final AbstractInvokable invokable;
        private final Thread executer;
        private final String taskName;
        private final ResultPartition[] producedPartitions;
        private final SingleInputGate[] inputGates;
        private final long interruptInterval;
        private final long interruptTimeout;
        private final TaskManagerActions taskManager;

        @Nullable
        private final Thread watchDogThread;

        /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$TaskCanceler$TaskCancelerWatchDog.class */
        private class TaskCancelerWatchDog implements Runnable {
            private TaskCancelerWatchDog() {
            }

            @Override // java.lang.Runnable
            public void run() {
                long convert = TimeUnit.NANOSECONDS.convert(TaskCanceler.this.interruptInterval, TimeUnit.MILLISECONDS);
                long nanoTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(TaskCanceler.this.interruptTimeout, TimeUnit.MILLISECONDS);
                try {
                    Thread.sleep(TaskCanceler.this.interruptInterval);
                } catch (InterruptedException e) {
                }
                while (TaskCanceler.this.executer.isAlive()) {
                    long nanoTime2 = System.nanoTime();
                    StringBuilder sb = new StringBuilder();
                    for (StackTraceElement stackTraceElement : TaskCanceler.this.executer.getStackTrace()) {
                        sb.append(stackTraceElement).append('\n');
                    }
                    if (nanoTime2 >= nanoTime) {
                        String format = String.format("Task '%s' did not react to cancelling signal in the last %d seconds, but is stuck in method:\n %s", TaskCanceler.this.taskName, Long.valueOf(TimeUnit.SECONDS.convert(TaskCanceler.this.interruptInterval, TimeUnit.MILLISECONDS)), sb.toString());
                        TaskCanceler.this.logger.info("Notifying TaskManager about fatal error. {}.", format);
                        TaskCanceler.this.taskManager.notifyFatalError(format, null);
                        return;
                    } else {
                        TaskCanceler.this.logger.warn("Task '{}' did not react to cancelling signal, but is stuck in method:\n {}", TaskCanceler.this.taskName, sb.toString());
                        TaskCanceler.this.executer.interrupt();
                        try {
                            long convert2 = TimeUnit.MILLISECONDS.convert(Math.min(convert, nanoTime - nanoTime2), TimeUnit.NANOSECONDS);
                            if (convert2 > 0) {
                                TaskCanceler.this.executer.join(convert2);
                            }
                        } catch (InterruptedException e2) {
                        }
                    }
                }
            }
        }

        public TaskCanceler(Logger logger, AbstractInvokable abstractInvokable, Thread thread, String str, long j, long j2, TaskManagerActions taskManagerActions, ResultPartition[] resultPartitionArr, SingleInputGate[] singleInputGateArr) {
            this.logger = logger;
            this.invokable = abstractInvokable;
            this.executer = thread;
            this.taskName = str;
            this.interruptInterval = j;
            this.interruptTimeout = j2;
            this.taskManager = taskManagerActions;
            this.producedPartitions = resultPartitionArr;
            this.inputGates = singleInputGateArr;
            if (j2 <= 0) {
                this.watchDogThread = null;
            } else {
                this.watchDogThread = new Thread(thread.getThreadGroup(), new TaskCancelerWatchDog(), "WatchDog for " + str + " cancellation");
                this.watchDogThread.setDaemon(true);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (this.watchDogThread != null) {
                    this.watchDogThread.start();
                }
                try {
                    this.invokable.cancel();
                } catch (Throwable th) {
                    this.logger.error("Error while canceling the task {}.", this.taskName, th);
                }
                for (ResultPartition resultPartition : this.producedPartitions) {
                    try {
                        resultPartition.destroyBufferPool();
                    } catch (Throwable th2) {
                        Task.LOG.error("Failed to release result partition buffer pool for task {}.", this.taskName, th2);
                    }
                }
                for (SingleInputGate singleInputGate : this.inputGates) {
                    try {
                        singleInputGate.releaseAllResources();
                    } catch (Throwable th3) {
                        Task.LOG.error("Failed to release input gate for task {}.", this.taskName, th3);
                    }
                }
                this.executer.interrupt();
                try {
                    this.executer.join(this.interruptInterval);
                } catch (InterruptedException e) {
                }
                if (this.watchDogThread != null) {
                    this.watchDogThread.interrupt();
                    this.watchDogThread.join();
                }
            } catch (Throwable th4) {
                this.logger.error("Error in the task canceler for task {}.", this.taskName, th4);
            }
        }
    }

    public Task(JobInformation jobInformation, TaskInformation taskInformation, ExecutionAttemptID executionAttemptID, AllocationID allocationID, int i, int i2, Collection<ResultPartitionDeploymentDescriptor> collection, Collection<InputGateDeploymentDescriptor> collection2, int i3, TaskStateHandles taskStateHandles, MemoryManager memoryManager, IOManager iOManager, NetworkEnvironment networkEnvironment, BroadcastVariableManager broadcastVariableManager, TaskManagerActions taskManagerActions, InputSplitProvider inputSplitProvider, CheckpointResponder checkpointResponder, LibraryCacheManager libraryCacheManager, FileCache fileCache, TaskManagerRuntimeInfo taskManagerRuntimeInfo, @Nonnull TaskMetricGroup taskMetricGroup, ResultPartitionConsumableNotifier resultPartitionConsumableNotifier, PartitionProducerStateChecker partitionProducerStateChecker, Executor executor) {
        Preconditions.checkNotNull(jobInformation);
        Preconditions.checkNotNull(taskInformation);
        Preconditions.checkArgument(0 <= i, "The subtask index must be positive.");
        Preconditions.checkArgument(0 <= i2, "The attempt number must be positive.");
        Preconditions.checkArgument(0 <= i3, "The target slot number must be positive.");
        this.taskInfo = new TaskInfo(taskInformation.getTaskName(), taskInformation.getMaxNumberOfSubtaks(), i, taskInformation.getNumberOfSubtasks(), i2);
        this.jobId = jobInformation.getJobId();
        this.vertexId = taskInformation.getJobVertexId();
        this.executionId = (ExecutionAttemptID) Preconditions.checkNotNull(executionAttemptID);
        this.allocationId = (AllocationID) Preconditions.checkNotNull(allocationID);
        this.taskNameWithSubtask = this.taskInfo.getTaskNameWithSubtasks();
        this.jobConfiguration = jobInformation.getJobConfiguration();
        this.taskConfiguration = taskInformation.getTaskConfiguration();
        this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
        this.requiredClasspaths = jobInformation.getRequiredClasspathURLs();
        this.nameOfInvokableClass = taskInformation.getInvokableClassName();
        this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
        this.taskStateHandles = taskStateHandles;
        Configuration configuration = taskManagerRuntimeInfo.getConfiguration();
        this.taskCancellationInterval = configuration.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
        this.taskCancellationTimeout = configuration.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
        this.memoryManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.broadcastVariableManager = (BroadcastVariableManager) Preconditions.checkNotNull(broadcastVariableManager);
        this.accumulatorRegistry = new AccumulatorRegistry(this.jobId, this.executionId);
        this.inputSplitProvider = (InputSplitProvider) Preconditions.checkNotNull(inputSplitProvider);
        this.checkpointResponder = (CheckpointResponder) Preconditions.checkNotNull(checkpointResponder);
        this.taskManagerActions = (TaskManagerActions) Preconditions.checkNotNull(taskManagerActions);
        this.libraryCache = (LibraryCacheManager) Preconditions.checkNotNull(libraryCacheManager);
        this.fileCache = (FileCache) Preconditions.checkNotNull(fileCache);
        this.network = (NetworkEnvironment) Preconditions.checkNotNull(networkEnvironment);
        this.taskManagerConfig = (TaskManagerRuntimeInfo) Preconditions.checkNotNull(taskManagerRuntimeInfo);
        this.taskExecutionStateListeners = new CopyOnWriteArrayList();
        this.metrics = taskMetricGroup;
        this.partitionProducerStateChecker = (PartitionProducerStateChecker) Preconditions.checkNotNull(partitionProducerStateChecker);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        String str = this.taskNameWithSubtask + " (" + this.executionId + ')';
        this.producedPartitions = new ResultPartition[collection.size()];
        this.writers = new ResultPartitionWriter[collection.size()];
        int i4 = 0;
        for (ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor : collection) {
            this.producedPartitions[i4] = new ResultPartition(str, this, this.jobId, new ResultPartitionID(resultPartitionDeploymentDescriptor.getPartitionId(), this.executionId), resultPartitionDeploymentDescriptor.getPartitionType(), resultPartitionDeploymentDescriptor.getNumberOfSubpartitions(), resultPartitionDeploymentDescriptor.getMaxParallelism(), networkEnvironment.getResultPartitionManager(), resultPartitionConsumableNotifier, iOManager, resultPartitionDeploymentDescriptor.sendScheduleOrUpdateConsumersMessage());
            this.writers[i4] = new ResultPartitionWriter(this.producedPartitions[i4]);
            i4++;
        }
        this.inputGates = new SingleInputGate[collection2.size()];
        this.inputGatesById = new HashMap();
        int i5 = 0;
        Iterator<InputGateDeploymentDescriptor> it = collection2.iterator();
        while (it.hasNext()) {
            SingleInputGate create = SingleInputGate.create(str, this.jobId, this.executionId, it.next(), networkEnvironment, this, taskMetricGroup.getIOMetricGroup());
            this.inputGates[i5] = create;
            this.inputGatesById.put(create.getConsumedResultId(), create);
            i5++;
        }
        this.invokableHasBeenCanceled = new AtomicBoolean(false);
        this.executingThread = new Thread(TASK_THREADS_GROUP, this, this.taskNameWithSubtask);
    }

    public JobID getJobID() {
        return this.jobId;
    }

    public JobVertexID getJobVertexId() {
        return this.vertexId;
    }

    public ExecutionAttemptID getExecutionId() {
        return this.executionId;
    }

    public AllocationID getAllocationId() {
        return this.allocationId;
    }

    public TaskInfo getTaskInfo() {
        return this.taskInfo;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    public ResultPartitionWriter[] getAllWriters() {
        return this.writers;
    }

    public SingleInputGate[] getAllInputGates() {
        return this.inputGates;
    }

    public ResultPartition[] getProducedPartitions() {
        return this.producedPartitions;
    }

    public SingleInputGate getInputGateById(IntermediateDataSetID intermediateDataSetID) {
        return this.inputGatesById.get(intermediateDataSetID);
    }

    public AccumulatorRegistry getAccumulatorRegistry() {
        return this.accumulatorRegistry;
    }

    public TaskMetricGroup getMetricGroup() {
        return this.metrics;
    }

    public Thread getExecutingThread() {
        return this.executingThread;
    }

    @VisibleForTesting
    long getTaskCancellationInterval() {
        return this.taskCancellationInterval;
    }

    @VisibleForTesting
    long getTaskCancellationTimeout() {
        return this.taskCancellationTimeout;
    }

    public ExecutionState getExecutionState() {
        return this.executionState;
    }

    public boolean isCanceledOrFailed() {
        return this.executionState == ExecutionState.CANCELING || this.executionState == ExecutionState.CANCELED || this.executionState == ExecutionState.FAILED;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public void startTaskThread() {
        this.executingThread.start();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            ExecutionState executionState = this.executionState;
            if (executionState == ExecutionState.CREATED) {
                if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                    Map<String, Future<Path>> hashMap = new HashMap<>();
                    try {
                        try {
                            LOG.info("Creating FileSystem stream leak safety net for task {}", this);
                            FileSystemSafetyNet.initializeSafetyNetForThread();
                            LOG.info("Loading JAR files for task {}.", this);
                            ClassLoader createUserCodeClassloader = createUserCodeClassloader(this.libraryCache);
                            ExecutionConfig deserializeValue = this.serializedExecutionConfig.deserializeValue(createUserCodeClassloader);
                            if (deserializeValue.getTaskCancellationInterval() >= 0) {
                                this.taskCancellationInterval = deserializeValue.getTaskCancellationInterval();
                            }
                            if (deserializeValue.getTaskCancellationTimeout() >= 0) {
                                this.taskCancellationTimeout = deserializeValue.getTaskCancellationTimeout();
                            }
                            AbstractInvokable loadAndInstantiateInvokable = loadAndInstantiateInvokable(createUserCodeClassloader, this.nameOfInvokableClass);
                            if (isCanceledOrFailed()) {
                                throw new CancelTaskException();
                            }
                            LOG.info("Registering task at network: {}.", this);
                            this.network.registerTask(this);
                            this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
                            if (this.taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
                                MetricGroup addGroup = this.metrics.getIOMetricGroup().addGroup("Network");
                                MetricGroup addGroup2 = addGroup.addGroup("Output");
                                MetricGroup addGroup3 = addGroup.addGroup("Input");
                                for (int i = 0; i < this.producedPartitions.length; i++) {
                                    ResultPartitionMetrics.registerQueueLengthMetrics(addGroup2.addGroup(i), this.producedPartitions[i]);
                                }
                                for (int i2 = 0; i2 < this.inputGates.length; i2++) {
                                    InputGateMetrics.registerQueueLengthMetrics(addGroup3.addGroup(i2), this.inputGates[i2]);
                                }
                            }
                            try {
                                for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : DistributedCache.readFileInfoFromConfig(this.jobConfiguration)) {
                                    LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
                                    hashMap.put(entry.getKey(), this.fileCache.createTmpFile(entry.getKey(), entry.getValue(), this.jobId));
                                }
                                if (isCanceledOrFailed()) {
                                    throw new CancelTaskException();
                                }
                                loadAndInstantiateInvokable.setEnvironment(new RuntimeEnvironment(this.jobId, this.vertexId, this.executionId, deserializeValue, this.taskInfo, this.jobConfiguration, this.taskConfiguration, createUserCodeClassloader, this.memoryManager, this.ioManager, this.broadcastVariableManager, this.accumulatorRegistry, this.network.createKvStateTaskRegistry(this.jobId, getJobVertexId()), this.inputSplitProvider, hashMap, this.writers, this.inputGates, this.checkpointResponder, this.taskManagerConfig, this.metrics, this));
                                if (null != this.taskStateHandles) {
                                    if (!(loadAndInstantiateInvokable instanceof StatefulTask)) {
                                        throw new IllegalStateException("Found operator state for a non-stateful task invokable");
                                    }
                                    ((StatefulTask) loadAndInstantiateInvokable).setInitialState(this.taskStateHandles);
                                    this.taskStateHandles = null;
                                }
                                this.invokable = loadAndInstantiateInvokable;
                                if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                                    throw new CancelTaskException();
                                }
                                notifyObservers(ExecutionState.RUNNING, null);
                                this.taskManagerActions.updateTaskExecutionState(new TaskExecutionState(this.jobId, this.executionId, ExecutionState.RUNNING));
                                this.executingThread.setContextClassLoader(createUserCodeClassloader);
                                loadAndInstantiateInvokable.invoke();
                                if (isCanceledOrFailed()) {
                                    throw new CancelTaskException();
                                }
                                for (ResultPartition resultPartition : this.producedPartitions) {
                                    if (resultPartition != null) {
                                        resultPartition.finish();
                                    }
                                }
                                if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                                    throw new CancelTaskException();
                                }
                                notifyObservers(ExecutionState.FINISHED, null);
                                try {
                                    LOG.info("Freeing task resources for {} ({}).", this.taskNameWithSubtask, this.executionId);
                                    ExecutorService executorService = this.asyncCallDispatcher;
                                    if (executorService != null && !executorService.isShutdown()) {
                                        executorService.shutdownNow();
                                    }
                                    this.network.unregisterTask(this);
                                    if (loadAndInstantiateInvokable != 0) {
                                        this.memoryManager.releaseAll(loadAndInstantiateInvokable);
                                    }
                                    this.libraryCache.unregisterTask(this.jobId, this.executionId);
                                    removeCachedFiles(hashMap, this.fileCache);
                                    LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
                                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                    notifyFinalState();
                                } catch (Throwable th) {
                                    String format = String.format("FATAL - exception in resource cleanup of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                                    LOG.error(format, th);
                                    notifyFatalError(format, th);
                                }
                                try {
                                    this.metrics.close();
                                    return;
                                } catch (Throwable th2) {
                                    LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, th2});
                                    return;
                                }
                            } catch (Exception e) {
                                throw new Exception(String.format("Exception while adding files to distributed cache of task %s (%s).", this.taskNameWithSubtask, this.executionId), e);
                            }
                        } catch (Throwable th3) {
                            try {
                                LOG.info("Freeing task resources for {} ({}).", this.taskNameWithSubtask, this.executionId);
                                ExecutorService executorService2 = this.asyncCallDispatcher;
                                if (executorService2 != null && !executorService2.isShutdown()) {
                                    executorService2.shutdownNow();
                                }
                                this.network.unregisterTask(this);
                                if (0 != 0) {
                                    this.memoryManager.releaseAll(null);
                                }
                                this.libraryCache.unregisterTask(this.jobId, this.executionId);
                                removeCachedFiles(hashMap, this.fileCache);
                                LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
                                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                notifyFinalState();
                            } catch (Throwable th4) {
                                String format2 = String.format("FATAL - exception in resource cleanup of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                                LOG.error(format2, th4);
                                notifyFatalError(format2, th4);
                            }
                            try {
                                this.metrics.close();
                            } catch (Throwable th5) {
                                LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, th5});
                            }
                            throw th3;
                        }
                    } catch (Throwable th6) {
                        th = th6;
                        if (th instanceof WrappingRuntimeException) {
                            th = ((WrappingRuntimeException) th).unwrap();
                        }
                        try {
                            if (ExceptionUtils.isJvmFatalError(th) || ((th instanceof OutOfMemoryError) && this.taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {
                                try {
                                    LOG.error("Encountered fatal error {} - terminating the JVM", th.getClass().getName(), th);
                                    Runtime.getRuntime().halt(-1);
                                } catch (Throwable th7) {
                                    Runtime.getRuntime().halt(-1);
                                    throw th7;
                                }
                            }
                            while (true) {
                                ExecutionState executionState2 = this.executionState;
                                if (executionState2 != ExecutionState.RUNNING && executionState2 != ExecutionState.DEPLOYING) {
                                    if (executionState2 == ExecutionState.CANCELING) {
                                        if (transitionState(executionState2, ExecutionState.CANCELED)) {
                                            notifyObservers(ExecutionState.CANCELED, null);
                                            break;
                                        }
                                    } else if (executionState2 != ExecutionState.FAILED) {
                                        if (transitionState(executionState2, ExecutionState.FAILED, th)) {
                                            LOG.error("Unexpected state in task {} ({}) during an exception: {}.", new Object[]{this.taskNameWithSubtask, this.executionId, executionState2});
                                            break;
                                        }
                                    } else {
                                        break;
                                    }
                                } else if (!(th instanceof CancelTaskException)) {
                                    if (transitionState(executionState2, ExecutionState.FAILED, th)) {
                                        String format3 = String.format("Execution of %s (%s) failed.", this.taskNameWithSubtask, this.executionId);
                                        this.failureCause = th;
                                        cancelInvokable();
                                        notifyObservers(ExecutionState.FAILED, new Exception(format3, th));
                                        break;
                                    }
                                } else {
                                    if (transitionState(executionState2, ExecutionState.CANCELED)) {
                                        cancelInvokable();
                                        notifyObservers(ExecutionState.CANCELED, null);
                                        break;
                                    }
                                }
                            }
                        } catch (Throwable th8) {
                            String format4 = String.format("FATAL - exception in exception handler of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                            LOG.error(format4, th8);
                            notifyFatalError(format4, th8);
                        }
                        try {
                            LOG.info("Freeing task resources for {} ({}).", this.taskNameWithSubtask, this.executionId);
                            ExecutorService executorService3 = this.asyncCallDispatcher;
                            if (executorService3 != null && !executorService3.isShutdown()) {
                                executorService3.shutdownNow();
                            }
                            this.network.unregisterTask(this);
                            if (0 != 0) {
                                this.memoryManager.releaseAll(null);
                            }
                            this.libraryCache.unregisterTask(this.jobId, this.executionId);
                            removeCachedFiles(hashMap, this.fileCache);
                            LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
                            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                            notifyFinalState();
                        } catch (Throwable th9) {
                            String format5 = String.format("FATAL - exception in resource cleanup of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                            LOG.error(format5, th9);
                            notifyFatalError(format5, th9);
                        }
                        try {
                            this.metrics.close();
                            return;
                        } catch (Throwable th10) {
                            LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, th10});
                            return;
                        }
                    }
                }
            } else {
                if (executionState == ExecutionState.FAILED) {
                    notifyFinalState();
                    if (this.metrics != null) {
                        this.metrics.close();
                        return;
                    }
                    return;
                }
                if (executionState != ExecutionState.CANCELING) {
                    if (this.metrics != null) {
                        this.metrics.close();
                    }
                    throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
                }
                if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
                    notifyFinalState();
                    if (this.metrics != null) {
                        this.metrics.close();
                        return;
                    }
                    return;
                }
            }
        }
    }

    private ClassLoader createUserCodeClassloader(LibraryCacheManager libraryCacheManager) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        libraryCacheManager.registerTask(this.jobId, this.executionId, this.requiredJarFiles, this.requiredClasspaths);
        LOG.debug("Register task {} at library cache manager took {} milliseconds", this.executionId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        ClassLoader classLoader = libraryCacheManager.getClassLoader(this.jobId);
        if (classLoader == null) {
            throw new Exception("No user code classloader available.");
        }
        return classLoader;
    }

    private AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String str) throws Exception {
        try {
            try {
                return (AbstractInvokable) Class.forName(str, true, classLoader).asSubclass(AbstractInvokable.class).newInstance();
            } catch (Throwable th) {
                throw new Exception("Could not instantiate the task's invokable class.", th);
            }
        } catch (Throwable th2) {
            throw new Exception("Could not load the task's invokable class.", th2);
        }
    }

    private void removeCachedFiles(Map<String, Future<Path>> map, FileCache fileCache) {
        try {
            Iterator<Map.Entry<String, Future<Path>>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                String key = it.next().getKey();
                try {
                    fileCache.deleteTmpFile(key, this.jobId);
                } catch (Exception e) {
                    LOG.error("Distributed Cache could not remove cached file registered under '" + key + "'.", e);
                }
            }
        } catch (Throwable th) {
            LOG.error("Error while removing cached local files from distributed cache.");
        }
    }

    private void notifyFinalState() {
        this.taskManagerActions.notifyFinalState(this.executionId);
    }

    private void notifyFatalError(String str, Throwable th) {
        this.taskManagerActions.notifyFatalError(str, th);
    }

    private boolean transitionState(ExecutionState executionState, ExecutionState executionState2) {
        return transitionState(executionState, executionState2, null);
    }

    private boolean transitionState(ExecutionState executionState, ExecutionState executionState2, Throwable th) {
        if (!STATE_UPDATER.compareAndSet(this, executionState, executionState2)) {
            return false;
        }
        if (th == null) {
            LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.taskNameWithSubtask, this.executionId, executionState, executionState2});
            return true;
        }
        LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.taskNameWithSubtask, this.executionId, executionState, executionState2, th});
        return true;
    }

    public void stopExecution() throws UnsupportedOperationException {
        LOG.info("Attempting to stop task {} ({}).", this.taskNameWithSubtask, this.executionId);
        if (!(this.invokable instanceof StoppableTask)) {
            throw new UnsupportedOperationException(String.format("Stopping not supported by task %s (%s).", this.taskNameWithSubtask, this.executionId));
        }
        executeAsyncCallRunnable(new Runnable() { // from class: org.apache.flink.runtime.taskmanager.Task.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ((StoppableTask) Task.this.invokable).stop();
                } catch (RuntimeException e) {
                    Task.LOG.error("Stopping task {} ({}) failed.", new Object[]{Task.this.taskNameWithSubtask, Task.this.executionId, e});
                    Task.this.taskManagerActions.failTask(Task.this.executionId, e);
                }
            }
        }, String.format("Stopping source task %s (%s).", this.taskNameWithSubtask, this.executionId));
    }

    public void cancelExecution() {
        LOG.info("Attempting to cancel task {} ({}).", this.taskNameWithSubtask, this.executionId);
        cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
    }

    @Override // org.apache.flink.runtime.taskmanager.TaskActions
    public void failExternally(Throwable th) {
        LOG.info("Attempting to fail task externally {} ({}).", this.taskNameWithSubtask, this.executionId);
        cancelOrFailAndCancelInvokable(ExecutionState.FAILED, th);
    }

    /* JADX WARN: Code restructure failed: missing block: B:38:0x0013, code lost:
    
        org.apache.flink.runtime.taskmanager.Task.LOG.info("Task {} is already in state {}", r14.taskNameWithSubtask, r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x0023, code lost:
    
        return;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void cancelOrFailAndCancelInvokable(org.apache.flink.runtime.execution.ExecutionState r15, java.lang.Throwable r16) {
        /*
            Method dump skipped, instructions count: 339
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.taskmanager.Task.cancelOrFailAndCancelInvokable(org.apache.flink.runtime.execution.ExecutionState, java.lang.Throwable):void");
    }

    public void registerExecutionListener(TaskExecutionStateListener taskExecutionStateListener) {
        this.taskExecutionStateListeners.add(taskExecutionStateListener);
    }

    private void notifyObservers(ExecutionState executionState, Throwable th) {
        TaskExecutionState taskExecutionState = new TaskExecutionState(this.jobId, this.executionId, executionState, th);
        Iterator<TaskExecutionStateListener> it = this.taskExecutionStateListeners.iterator();
        while (it.hasNext()) {
            it.next().notifyTaskExecutionStateChanged(taskExecutionState);
        }
    }

    @Override // org.apache.flink.runtime.taskmanager.TaskActions
    public void triggerPartitionProducerStateCheck(JobID jobID, final IntermediateDataSetID intermediateDataSetID, final ResultPartitionID resultPartitionID) {
        this.partitionProducerStateChecker.requestPartitionProducerState(jobID, intermediateDataSetID, resultPartitionID).handleAsync(new BiFunction<ExecutionState, Throwable, Void>() { // from class: org.apache.flink.runtime.taskmanager.Task.2
            @Override // org.apache.flink.runtime.concurrent.BiFunction
            public Void apply(ExecutionState executionState, Throwable th) {
                try {
                    if (executionState != null) {
                        Task.this.onPartitionStateUpdate(intermediateDataSetID, resultPartitionID, executionState);
                    } else if (th instanceof TimeoutException) {
                        Task.this.onPartitionStateUpdate(intermediateDataSetID, resultPartitionID, ExecutionState.RUNNING);
                    } else if (th instanceof PartitionProducerDisposedException) {
                        Task.LOG.info(String.format("Producer %s of partition %s disposed. Cancelling execution.", resultPartitionID.getProducerId(), resultPartitionID.getPartitionId()), th);
                        Task.this.cancelExecution();
                    } else {
                        Task.this.failExternally(th);
                    }
                    return null;
                } catch (IOException | InterruptedException e) {
                    Task.this.failExternally(e);
                    return null;
                }
            }
        }, this.executor);
    }

    public void triggerCheckpointBarrier(final long j, long j2, final CheckpointOptions checkpointOptions) {
        Object obj = this.invokable;
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(j, j2);
        if (this.executionState != ExecutionState.RUNNING || obj == null) {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", this.taskNameWithSubtask, this.executionId);
            this.checkpointResponder.declineCheckpoint(this.jobId, this.executionId, j, new CheckpointDeclineTaskNotReadyException(this.taskNameWithSubtask));
        } else if (!(obj instanceof StatefulTask)) {
            this.checkpointResponder.declineCheckpoint(this.jobId, this.executionId, j, new CheckpointDeclineTaskNotCheckpointingException(this.taskNameWithSubtask));
            LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).", this.taskNameWithSubtask, this.executionId);
        } else {
            final StatefulTask statefulTask = (StatefulTask) obj;
            final String str = this.taskNameWithSubtask;
            final SafetyNetCloseableRegistry safetyNetCloseableRegistryForThread = FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
            executeAsyncCallRunnable(new Runnable() { // from class: org.apache.flink.runtime.taskmanager.Task.3
                @Override // java.lang.Runnable
                public void run() {
                    Task.LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistryForThread);
                    try {
                        try {
                            if (!statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions)) {
                                Task.this.checkpointResponder.declineCheckpoint(Task.this.getJobID(), Task.this.getExecutionId(), j, new CheckpointDeclineTaskNotReadyException(str));
                            }
                            FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                        } catch (Throwable th) {
                            if (Task.this.getExecutionState() == ExecutionState.RUNNING) {
                                Task.this.failExternally(new Exception("Error while triggering checkpoint " + j + " for " + Task.this.taskNameWithSubtask, th));
                            } else {
                                Task.LOG.debug("Encountered error while triggering checkpoint {} for {} ({}) while being not in state running.", new Object[]{Long.valueOf(j), Task.this.taskNameWithSubtask, Task.this.executionId, th});
                            }
                            FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                        }
                    } catch (Throwable th2) {
                        FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                        throw th2;
                    }
                }
            }, String.format("Checkpoint Trigger for %s (%s).", this.taskNameWithSubtask, this.executionId));
        }
    }

    public void notifyCheckpointComplete(final long j) {
        Object obj = this.invokable;
        if (this.executionState != ExecutionState.RUNNING || obj == null) {
            LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", this.taskNameWithSubtask);
        } else {
            if (!(obj instanceof StatefulTask)) {
                LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - {}.", this.taskNameWithSubtask);
                return;
            }
            final StatefulTask statefulTask = (StatefulTask) obj;
            executeAsyncCallRunnable(new Runnable() { // from class: org.apache.flink.runtime.taskmanager.Task.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        statefulTask.notifyCheckpointComplete(j);
                    } catch (Throwable th) {
                        if (Task.this.getExecutionState() == ExecutionState.RUNNING) {
                            Task.this.failExternally(new RuntimeException("Error while confirming checkpoint", th));
                        }
                    }
                }
            }, "Checkpoint Confirmation for " + this.taskNameWithSubtask);
        }
    }

    @VisibleForTesting
    void onPartitionStateUpdate(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID, ExecutionState executionState) throws IOException, InterruptedException {
        if (this.executionState != ExecutionState.RUNNING) {
            LOG.debug("Task {} ignored a partition producer state notification, because it's not running.", this.taskNameWithSubtask);
            return;
        }
        SingleInputGate singleInputGate = this.inputGatesById.get(intermediateDataSetID);
        if (singleInputGate == null) {
            failExternally(new IllegalStateException("Received partition producer state for unknown input gate " + intermediateDataSetID + "."));
            return;
        }
        if (executionState == ExecutionState.SCHEDULED || executionState == ExecutionState.DEPLOYING || executionState == ExecutionState.RUNNING || executionState == ExecutionState.FINISHED) {
            singleInputGate.retriggerPartitionRequest(resultPartitionID.getPartitionId());
            return;
        }
        if (executionState != ExecutionState.CANCELING && executionState != ExecutionState.CANCELED && executionState != ExecutionState.FAILED) {
            failExternally(new IllegalStateException(String.format("Producer with attempt ID %s of partition %s in unexpected state %s.", resultPartitionID.getProducerId(), resultPartitionID.getPartitionId(), executionState)));
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Cancelling task {} after the producer of partition {} with attempt ID {} has entered state {}.", new Object[]{this.taskNameWithSubtask, resultPartitionID.getPartitionId(), resultPartitionID.getProducerId(), executionState});
        }
        cancelExecution();
    }

    private void executeAsyncCallRunnable(Runnable runnable, String str) {
        synchronized (this) {
            if (this.executionState != ExecutionState.RUNNING) {
                return;
            }
            ExecutorService executorService = this.asyncCallDispatcher;
            if (executorService == null) {
                executorService = Executors.newSingleThreadExecutor(new DispatcherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + this.taskNameWithSubtask));
                this.asyncCallDispatcher = executorService;
                if (this.executionState != ExecutionState.RUNNING) {
                    executorService.shutdown();
                    this.asyncCallDispatcher = null;
                    return;
                }
            }
            LOG.debug("Invoking async call {} on task {}", str, this.taskNameWithSubtask);
            try {
                executorService.submit(runnable);
            } catch (RejectedExecutionException e) {
                if (this.executionState == ExecutionState.RUNNING) {
                    throw new RuntimeException("Async call was rejected, even though the task is running.", e);
                }
            }
        }
    }

    private void cancelInvokable() {
        if (this.invokable == null || this.invokable == null || !this.invokableHasBeenCanceled.compareAndSet(false, true)) {
            return;
        }
        try {
            this.invokable.cancel();
        } catch (Throwable th) {
            LOG.error("Error while canceling task {}.", this.taskNameWithSubtask, th);
        }
    }

    public String toString() {
        return String.format("%s (%s) [%s]", this.taskNameWithSubtask, this.executionId, this.executionState);
    }
}
