package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.QueryableStateUtils;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskManagerServices.class */
public class TaskManagerServices {
    private static final Logger LOG = LoggerFactory.getLogger(TaskManagerServices.class);

    @VisibleForTesting
    public static final String LOCAL_STATE_SUB_DIRECTORY_ROOT = "localState";
    private final TaskManagerLocation taskManagerLocation;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final NetworkEnvironment networkEnvironment;
    private final BroadcastVariableManager broadcastVariableManager;
    private final TaskSlotTable taskSlotTable;
    private final JobManagerTable jobManagerTable;
    private final JobLeaderService jobLeaderService;
    private final TaskExecutorLocalStateStoresManager taskManagerStateStore;

    TaskManagerServices(TaskManagerLocation taskManagerLocation, MemoryManager memoryManager, IOManager iOManager, NetworkEnvironment networkEnvironment, BroadcastVariableManager broadcastVariableManager, TaskSlotTable taskSlotTable, JobManagerTable jobManagerTable, JobLeaderService jobLeaderService, TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager) {
        this.taskManagerLocation = (TaskManagerLocation) Preconditions.checkNotNull(taskManagerLocation);
        this.memoryManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.networkEnvironment = (NetworkEnvironment) Preconditions.checkNotNull(networkEnvironment);
        this.broadcastVariableManager = (BroadcastVariableManager) Preconditions.checkNotNull(broadcastVariableManager);
        this.taskSlotTable = (TaskSlotTable) Preconditions.checkNotNull(taskSlotTable);
        this.jobManagerTable = (JobManagerTable) Preconditions.checkNotNull(jobManagerTable);
        this.jobLeaderService = (JobLeaderService) Preconditions.checkNotNull(jobLeaderService);
        this.taskManagerStateStore = (TaskExecutorLocalStateStoresManager) Preconditions.checkNotNull(taskExecutorLocalStateStoresManager);
    }

    public MemoryManager getMemoryManager() {
        return this.memoryManager;
    }

    public IOManager getIOManager() {
        return this.ioManager;
    }

    public NetworkEnvironment getNetworkEnvironment() {
        return this.networkEnvironment;
    }

    public TaskManagerLocation getTaskManagerLocation() {
        return this.taskManagerLocation;
    }

    public BroadcastVariableManager getBroadcastVariableManager() {
        return this.broadcastVariableManager;
    }

    public TaskSlotTable getTaskSlotTable() {
        return this.taskSlotTable;
    }

    public JobManagerTable getJobManagerTable() {
        return this.jobManagerTable;
    }

    public JobLeaderService getJobLeaderService() {
        return this.jobLeaderService;
    }

    public TaskExecutorLocalStateStoresManager getTaskManagerStateStore() {
        return this.taskManagerStateStore;
    }

    public void shutDown() throws FlinkException {
        Exception exc = null;
        try {
            this.taskManagerStateStore.shutdown();
        } catch (Exception e) {
            exc = e;
        }
        try {
            this.memoryManager.shutdown();
        } catch (Exception e2) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
        }
        try {
            this.ioManager.shutdown();
        } catch (Exception e3) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
        }
        try {
            this.networkEnvironment.shutdown();
        } catch (Exception e4) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e4, exc);
        }
        try {
            this.taskSlotTable.stop();
        } catch (Exception e5) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e5, exc);
        }
        try {
            this.jobLeaderService.stop();
        } catch (Exception e6) {
            exc = (Exception) ExceptionUtils.firstOrSuppressed(e6, exc);
        }
        if (exc != null) {
            throw new FlinkException("Could not properly shut down the TaskManager services.", exc);
        }
    }

    public static TaskManagerServices fromConfiguration(TaskManagerServicesConfiguration taskManagerServicesConfiguration, ResourceID resourceID, Executor executor, long j, long j2) throws Exception {
        checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
        NetworkEnvironment createNetworkEnvironment = createNetworkEnvironment(taskManagerServicesConfiguration, j2);
        createNetworkEnvironment.start();
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceID, taskManagerServicesConfiguration.getTaskManagerAddress(), createNetworkEnvironment.getConnectionManager().getDataPort());
        MemoryManager createMemoryManager = createMemoryManager(taskManagerServicesConfiguration, j, j2);
        IOManagerAsync iOManagerAsync = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
        BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
        ArrayList arrayList = new ArrayList(taskManagerServicesConfiguration.getNumberOfSlots());
        for (int i = 0; i < taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
            arrayList.add(new ResourceProfile(1.0d, 42));
        }
        TaskSlotTable taskSlotTable = new TaskSlotTable(arrayList, new TimerService(new ScheduledThreadPoolExecutor(1), taskManagerServicesConfiguration.getTimerServiceShutdownTimeout()));
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        String[] localRecoveryStateRootDirectories = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
        File[] fileArr = new File[localRecoveryStateRootDirectories.length];
        for (int i2 = 0; i2 < localRecoveryStateRootDirectories.length; i2++) {
            fileArr[i2] = new File(localRecoveryStateRootDirectories[i2], LOCAL_STATE_SUB_DIRECTORY_ROOT);
        }
        return new TaskManagerServices(taskManagerLocation, createMemoryManager, iOManagerAsync, createNetworkEnvironment, broadcastVariableManager, taskSlotTable, jobManagerTable, jobLeaderService, new TaskExecutorLocalStateStoresManager(taskManagerServicesConfiguration.isLocalRecoveryEnabled(), fileArr, executor));
    }

    private static MemoryManager createMemoryManager(TaskManagerServicesConfiguration taskManagerServicesConfiguration, long j, long j2) throws Exception {
        long j3;
        long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory();
        MemoryType memoryType = taskManagerServicesConfiguration.getMemoryType();
        boolean isPreAllocateMemory = taskManagerServicesConfiguration.isPreAllocateMemory();
        if (configuredMemory > 0) {
            if (isPreAllocateMemory) {
                LOG.info("Using {} MB for managed memory.", Long.valueOf(configuredMemory));
            } else {
                LOG.info("Limiting managed memory to {} MB, memory will be allocated lazily.", Long.valueOf(configuredMemory));
            }
            j3 = configuredMemory << 20;
        } else {
            float memoryFraction = taskManagerServicesConfiguration.getMemoryFraction();
            if (memoryType == MemoryType.HEAP) {
                long j4 = ((float) j) * memoryFraction;
                if (isPreAllocateMemory) {
                    LOG.info("Using {} of the currently free heap space for managed heap memory ({} MB).", Float.valueOf(memoryFraction), Long.valueOf(j4 >> 20));
                } else {
                    LOG.info("Limiting managed memory to {} of the currently free heap space ({} MB), memory will be allocated lazily.", Float.valueOf(memoryFraction), Long.valueOf(j4 >> 20));
                }
                j3 = j4;
            } else {
                if (memoryType != MemoryType.OFF_HEAP) {
                    throw new RuntimeException("No supported memory type detected.");
                }
                long j5 = (long) ((j2 / (1.0d - memoryFraction)) * memoryFraction);
                if (isPreAllocateMemory) {
                    LOG.info("Using {} of the maximum memory size for managed off-heap memory ({} MB).", Float.valueOf(memoryFraction), Long.valueOf(j5 >> 20));
                } else {
                    LOG.info("Limiting managed memory to {} of the maximum memory size ({} MB), memory will be allocated lazily.", Float.valueOf(memoryFraction), Long.valueOf(j5 >> 20));
                }
                j3 = j5;
            }
        }
        try {
            return new MemoryManager(j3, taskManagerServicesConfiguration.getNumberOfSlots(), taskManagerServicesConfiguration.getNetworkConfig().networkBufferSize(), memoryType, isPreAllocateMemory);
        } catch (OutOfMemoryError e) {
            if (memoryType == MemoryType.HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() + ") while allocating the TaskManager heap memory (" + j3 + " bytes).", e);
            }
            if (memoryType == MemoryType.OFF_HEAP) {
                throw new Exception("OutOfMemory error (" + e.getMessage() + ") while allocating the TaskManager off-heap memory (" + j3 + " bytes).Try increasing the maximum direct memory (-XX:MaxDirectMemorySize)", e);
            }
            throw e;
        }
    }

    private static NetworkEnvironment createNetworkEnvironment(TaskManagerServicesConfiguration taskManagerServicesConfiguration, long j) {
        ConnectionManager localConnectionManager;
        NetworkEnvironmentConfiguration networkConfig = taskManagerServicesConfiguration.getNetworkConfig();
        long calculateNetworkBufferMemory = calculateNetworkBufferMemory(taskManagerServicesConfiguration, j);
        int networkBufferSize = networkConfig.networkBufferSize();
        long j2 = calculateNetworkBufferMemory / networkBufferSize;
        if (j2 > 2147483647L) {
            throw new IllegalArgumentException("The given number of memory bytes (" + calculateNetworkBufferMemory + ") corresponds to more than MAX_INT pages.");
        }
        NetworkBufferPool networkBufferPool = new NetworkBufferPool((int) j2, networkBufferSize);
        boolean z = false;
        NettyConfig nettyConfig = networkConfig.nettyConfig();
        if (nettyConfig != null) {
            localConnectionManager = new NettyConnectionManager(nettyConfig);
            z = nettyConfig.isCreditBasedEnabled();
        } else {
            localConnectionManager = new LocalConnectionManager();
        }
        ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        QueryableStateConfiguration queryableStateConfig = taskManagerServicesConfiguration.getQueryableStateConfig();
        return new NetworkEnvironment(networkBufferPool, localConnectionManager, resultPartitionManager, taskEventDispatcher, kvStateRegistry, QueryableStateUtils.createKvStateServer(taskManagerServicesConfiguration.getTaskManagerAddress(), queryableStateConfig.getStateServerPortRange(), queryableStateConfig.numStateServerThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : queryableStateConfig.numStateServerThreads(), queryableStateConfig.numStateQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : queryableStateConfig.numStateQueryThreads(), kvStateRegistry, new DisabledKvStateRequestStats()), QueryableStateUtils.createKvStateClientProxy(taskManagerServicesConfiguration.getTaskManagerAddress(), queryableStateConfig.getProxyPortRange(), queryableStateConfig.numProxyServerThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : queryableStateConfig.numProxyServerThreads(), queryableStateConfig.numProxyQueryThreads() == 0 ? taskManagerServicesConfiguration.getNumberOfSlots() : queryableStateConfig.numProxyQueryThreads(), new DisabledKvStateRequestStats()), networkConfig.ioMode(), networkConfig.partitionRequestInitialBackoff(), networkConfig.partitionRequestMaxBackoff(), networkConfig.networkBuffersPerChannel(), networkConfig.floatingNetworkBuffersPerGate(), z);
    }

    public static long calculateNetworkBufferMemory(long j, Configuration configuration) {
        long j2;
        Preconditions.checkArgument(j > 0);
        int checkedDownCast = MathUtils.checkedDownCast(MemorySize.parse(configuration.getString(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes());
        if (TaskManagerServicesConfiguration.hasNewNetworkBufConf(configuration)) {
            float f = configuration.getFloat(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION);
            long bytes = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN)).getBytes();
            long bytes2 = MemorySize.parse(configuration.getString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX)).getBytes();
            TaskManagerServicesConfiguration.checkNetworkBufferConfig(checkedDownCast, f, bytes, bytes2);
            j2 = Math.min(bytes2, Math.max(bytes, f * ((float) j)));
            TaskManagerServicesConfiguration.checkConfigParameter(j2 < j, "(" + f + ", " + bytes + ", " + bytes2 + ")", "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", "Network buffer memory size too large: " + j2 + " >= " + j + " (total JVM memory size)");
            TaskManagerServicesConfiguration.checkConfigParameter(j2 >= ((long) checkedDownCast), "(" + f + ", " + bytes + ", " + bytes2 + ")", "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", "Network buffer memory size too small: " + j2 + " < " + checkedDownCast + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
        } else {
            int integer = configuration.getInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS);
            j2 = integer * checkedDownCast;
            TaskManagerServicesConfiguration.checkNetworkConfigOld(integer);
            TaskManagerServicesConfiguration.checkConfigParameter(j2 < j, Long.valueOf(j2), TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), "Network buffer memory size too large: " + j2 + " >= " + j + " (total JVM memory size)");
            TaskManagerServicesConfiguration.checkConfigParameter(j2 >= ((long) checkedDownCast), Long.valueOf(j2), TaskManagerOptions.NETWORK_NUM_BUFFERS.key(), "Network buffer memory size too small: " + j2 + " < " + checkedDownCast + " (" + TaskManagerOptions.MEMORY_SEGMENT_SIZE.key() + ")");
        }
        return j2;
    }

    public static long calculateNetworkBufferMemory(TaskManagerServicesConfiguration taskManagerServicesConfiguration, long j) {
        long memoryFraction;
        NetworkEnvironmentConfiguration networkConfig = taskManagerServicesConfiguration.getNetworkConfig();
        float networkBufFraction = networkConfig.networkBufFraction();
        long networkBufMin = networkConfig.networkBufMin();
        long networkBufMax = networkConfig.networkBufMax();
        if (networkBufMin == networkBufMax) {
            return networkBufMin;
        }
        MemoryType memoryType = taskManagerServicesConfiguration.getMemoryType();
        if (memoryType == MemoryType.HEAP) {
            memoryFraction = j;
        } else {
            if (memoryType != MemoryType.OFF_HEAP) {
                throw new RuntimeException("No supported memory type detected.");
            }
            long configuredMemory = taskManagerServicesConfiguration.getConfiguredMemory() << 20;
            memoryFraction = configuredMemory > 0 ? j + configuredMemory : (long) (j / (1.0d - taskManagerServicesConfiguration.getMemoryFraction()));
        }
        long min = Math.min(networkBufMax, Math.max(networkBufMin, (long) ((memoryFraction / (1.0d - networkBufFraction)) * networkBufFraction)));
        TaskManagerServicesConfiguration.checkConfigParameter(min < j, "(" + networkBufFraction + ", " + networkBufMin + ", " + networkBufMax + ")", "(" + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_FRACTION.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN.key() + ", " + TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX.key() + ")", "Network buffer memory size too large: " + min + " >= " + j + "(maximum JVM heap size)");
        return min;
    }

    public static long calculateHeapSizeMB(long j, Configuration configuration) {
        long j2;
        long mebiBytes;
        Preconditions.checkArgument(j > 0);
        long calculateNetworkBufferMemory = calculateNetworkBufferMemory(j << 20, configuration) >> 20;
        long j3 = j - calculateNetworkBufferMemory;
        if (configuration.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
            String defaultValue = TaskManagerOptions.MANAGED_MEMORY_SIZE.defaultValue();
            if (configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE).equals(defaultValue)) {
                mebiBytes = Long.valueOf(defaultValue).longValue();
            } else {
                try {
                    mebiBytes = MemorySize.parse(configuration.getString(TaskManagerOptions.MANAGED_MEMORY_SIZE), MemorySize.MemoryUnit.MEGA_BYTES).getMebiBytes();
                } catch (IllegalArgumentException e) {
                    throw new IllegalConfigurationException("Could not read " + TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), e);
                }
            }
            if (mebiBytes <= 0) {
                mebiBytes = (long) (configuration.getFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION) * j3);
            }
            TaskManagerServicesConfiguration.checkConfigParameter(mebiBytes < j3, Long.valueOf(mebiBytes), TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "Managed memory size too large for " + calculateNetworkBufferMemory + " MB network buffer memory and a total of " + j + " MB JVM memory");
            j2 = j3 - mebiBytes;
        } else {
            j2 = j3;
        }
        return j2;
    }

    private static void checkTempDirs(String[] strArr) throws IOException {
        for (String str : strArr) {
            if (str == null || str.equals("")) {
                throw new IllegalArgumentException("Temporary file directory #$id is null.");
            }
            File file = new File(str);
            if (!file.exists() && !file.mkdirs()) {
                throw new IOException("Temporary file directory " + file.getAbsolutePath() + " does not exist and could not be created.");
            }
            if (!file.isDirectory()) {
                throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not a directory.");
            }
            if (!file.canWrite()) {
                throw new IOException("Temporary file directory " + file.getAbsolutePath() + " is not writable.");
            }
            if (LOG.isInfoEnabled()) {
                long totalSpace = file.getTotalSpace() >> 30;
                long usableSpace = file.getUsableSpace() >> 30;
                LOG.info(String.format("Temporary file directory '%s': total %d GB, usable %d GB (%.2f%% usable)", file.getAbsolutePath(), Long.valueOf(totalSpace), Long.valueOf(usableSpace), Double.valueOf((usableSpace / totalSpace) * 100.0d)));
            }
        }
    }
}
