package org.apache.tez.dag.app.launcher;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.rm.NMCommunicatorEvent;
import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
import org.apache.tez.dag.app.rm.NMCommunicatorLaunchRequestEvent;
import org.apache.tez.dag.app.rm.container.AMContainerEvent;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchFailed;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopFailed;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/app/launcher/ContainerLauncherImpl.class */
public class ContainerLauncherImpl extends AbstractService implements ContainerLauncher {
    static final Logger LOG = LoggerFactory.getLogger(ContainerLauncherImpl.class);
    private ConcurrentHashMap<ContainerId, Container> containers;
    private AppContext context;
    protected ThreadPoolExecutor launcherPool;
    protected static final int INITIAL_POOL_SIZE = 10;
    private int limitOnPoolSize;
    private Thread eventHandlingThread;
    protected BlockingQueue<NMCommunicatorEvent> eventQueue;
    private Clock clock;
    private ContainerManagementProtocolProxy cmProxy;
    private AtomicBoolean serviceStopped;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/launcher/ContainerLauncherImpl$Container.class */
    public class Container {
        private ContainerState state = ContainerState.PREP;
        private ContainerId containerID;
        private final String containerMgrAddress;
        private Token containerToken;

        public Container(ContainerId containerId, String str, Token token) {
            this.containerMgrAddress = str;
            this.containerID = containerId;
            this.containerToken = token;
        }

        public synchronized boolean isCompletelyDone() {
            return this.state == ContainerState.DONE || this.state == ContainerState.FAILED;
        }

        public synchronized void launch(NMCommunicatorLaunchRequestEvent nMCommunicatorLaunchRequestEvent) {
            ContainerLauncherImpl.LOG.info("Launching Container with Id: " + nMCommunicatorLaunchRequestEvent.getContainerId());
            if (this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
                this.state = ContainerState.DONE;
                ContainerLauncherImpl.this.sendContainerLaunchFailedMsg(nMCommunicatorLaunchRequestEvent.getContainerId(), "Container was killed before it was launched");
                return;
            }
            try {
                try {
                    ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData cMProxy = ContainerLauncherImpl.this.getCMProxy(this.containerID, this.containerMgrAddress, this.containerToken);
                    ContainerLaunchContext containerLaunchContext = nMCommunicatorLaunchRequestEvent.getContainerLaunchContext();
                    StartContainerRequest startContainerRequest = (StartContainerRequest) Records.newRecord(StartContainerRequest.class);
                    startContainerRequest.setContainerToken(nMCommunicatorLaunchRequestEvent.getContainerToken());
                    startContainerRequest.setContainerLaunchContext(containerLaunchContext);
                    StartContainersResponse startContainers = cMProxy.getContainerManagementProtocol().startContainers(StartContainersRequest.newInstance(Collections.singletonList(startContainerRequest)));
                    if (startContainers.getFailedRequests() != null && !startContainers.getFailedRequests().isEmpty()) {
                        throw ((SerializedException) startContainers.getFailedRequests().get(this.containerID)).deSerialize();
                    }
                    ContainerLauncherImpl.this.context.getEventHandler().handle(new AMContainerEventLaunched(this.containerID));
                    ContainerLauncherImpl.this.context.getHistoryHandler().handle(new DAGHistoryEvent(null, new ContainerLaunchedEvent(this.containerID, ContainerLauncherImpl.this.clock.getTime(), ContainerLauncherImpl.this.context.getApplicationAttemptId())));
                    this.state = ContainerState.RUNNING;
                    if (cMProxy != null) {
                        ContainerLauncherImpl.this.cmProxy.mayBeCloseProxy(cMProxy);
                    }
                } catch (Throwable th) {
                    String str = "Container launch failed for " + this.containerID + " : " + ExceptionUtils.getStackTrace(th);
                    this.state = ContainerState.FAILED;
                    ContainerLauncherImpl.this.sendContainerLaunchFailedMsg(this.containerID, str);
                    if (0 != 0) {
                        ContainerLauncherImpl.this.cmProxy.mayBeCloseProxy((ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData) null);
                    }
                }
            } catch (Throwable th2) {
                if (0 != 0) {
                    ContainerLauncherImpl.this.cmProxy.mayBeCloseProxy((ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData) null);
                }
                throw th2;
            }
        }

        public synchronized void kill() {
            if (isCompletelyDone()) {
                return;
            }
            if (this.state == ContainerState.PREP) {
                this.state = ContainerState.KILLED_BEFORE_LAUNCH;
                return;
            }
            ContainerLauncherImpl.LOG.info("Sending a stop request to the NM for ContainerId: " + this.containerID);
            ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData containerManagementProtocolProxyData = null;
            try {
                try {
                    containerManagementProtocolProxyData = ContainerLauncherImpl.this.getCMProxy(this.containerID, this.containerMgrAddress, this.containerToken);
                    StopContainersRequest stopContainersRequest = (StopContainersRequest) Records.newRecord(StopContainersRequest.class);
                    stopContainersRequest.setContainerIds(Collections.singletonList(this.containerID));
                    containerManagementProtocolProxyData.getContainerManagementProtocol().stopContainers(stopContainersRequest);
                    ContainerLauncherImpl.this.context.getEventHandler().handle(new AMContainerEvent(this.containerID, AMContainerEventType.C_NM_STOP_SENT));
                    if (containerManagementProtocolProxyData != null) {
                        ContainerLauncherImpl.this.cmProxy.mayBeCloseProxy(containerManagementProtocolProxyData);
                    }
                    this.state = ContainerState.DONE;
                } catch (Throwable th) {
                    String str = "cleanup failed for container " + this.containerID + " : " + ExceptionUtils.getStackTrace(th);
                    ContainerLauncherImpl.this.context.getEventHandler().handle(new AMContainerEventStopFailed(this.containerID, str));
                    ContainerLauncherImpl.LOG.warn(str);
                    this.state = ContainerState.DONE;
                    if (containerManagementProtocolProxyData != null) {
                        ContainerLauncherImpl.this.cmProxy.mayBeCloseProxy(containerManagementProtocolProxyData);
                    }
                }
            } catch (Throwable th2) {
                if (containerManagementProtocolProxyData != null) {
                    ContainerLauncherImpl.this.cmProxy.mayBeCloseProxy(containerManagementProtocolProxyData);
                }
                throw th2;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/app/launcher/ContainerLauncherImpl$ContainerState.class */
    public enum ContainerState {
        PREP,
        FAILED,
        RUNNING,
        DONE,
        KILLED_BEFORE_LAUNCH
    }

    /* loaded from: input_file:org/apache/tez/dag/app/launcher/ContainerLauncherImpl$CustomizedRejectedExecutionHandler.class */
    private static class CustomizedRejectedExecutionHandler implements RejectedExecutionHandler {
        private CustomizedRejectedExecutionHandler() {
        }

        @Override // java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            ContainerLauncherImpl.LOG.warn("Can't submit task to ThreadPoolExecutor:" + threadPoolExecutor);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/app/launcher/ContainerLauncherImpl$EventProcessor.class */
    public class EventProcessor implements Runnable {
        private NMCommunicatorEvent event;

        EventProcessor(NMCommunicatorEvent nMCommunicatorEvent) {
            this.event = nMCommunicatorEvent;
        }

        @Override // java.lang.Runnable
        public void run() {
            ContainerLauncherImpl.LOG.info("Processing the event " + this.event.toString());
            ContainerId containerId = this.event.getContainerId();
            Container container = ContainerLauncherImpl.this.getContainer(this.event);
            switch ((NMCommunicatorEventType) this.event.getType()) {
                case CONTAINER_LAUNCH_REQUEST:
                    container.launch((NMCommunicatorLaunchRequestEvent) this.event);
                    break;
                case CONTAINER_STOP_REQUEST:
                    container.kill();
                    break;
            }
            ContainerLauncherImpl.this.removeContainerIfDone(containerId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Container getContainer(NMCommunicatorEvent nMCommunicatorEvent) {
        ContainerId containerId = nMCommunicatorEvent.getContainerId();
        Container container = this.containers.get(containerId);
        if (container == null) {
            container = new Container(nMCommunicatorEvent.getContainerId(), nMCommunicatorEvent.getNodeId().toString(), nMCommunicatorEvent.getContainerToken());
            Container putIfAbsent = this.containers.putIfAbsent(containerId, container);
            if (putIfAbsent != null) {
                container = putIfAbsent;
            }
        }
        return container;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeContainerIfDone(ContainerId containerId) {
        Container container = this.containers.get(containerId);
        if (container == null || !container.isCompletelyDone()) {
            return;
        }
        this.containers.remove(containerId);
    }

    @Override // org.apache.tez.dag.app.launcher.ContainerLauncher
    public void dagComplete(DAG dag) {
    }

    @Override // org.apache.tez.dag.app.launcher.ContainerLauncher
    public void dagSubmitted() {
    }

    public ContainerLauncherImpl(AppContext appContext) {
        super(ContainerLauncherImpl.class.getName());
        this.containers = new ConcurrentHashMap<>();
        this.eventQueue = new LinkedBlockingQueue();
        this.serviceStopped = new AtomicBoolean(false);
        this.context = appContext;
        this.clock = appContext.getClock();
    }

    public synchronized void serviceInit(Configuration configuration) {
        Configuration configuration2 = new Configuration(configuration);
        configuration2.setInt("ipc.client.connection.maxidletime", 0);
        this.limitOnPoolSize = configuration2.getInt("tez.am.containerlauncher.thread-count-limit", 500);
        LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
    }

    public void serviceStart() {
        this.cmProxy = new ContainerManagementProtocolProxy(new Configuration(getConfig()));
        this.launcherPool = new ThreadPoolExecutor(INITIAL_POOL_SIZE, Integer.MAX_VALUE, 1L, TimeUnit.HOURS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build(), new CustomizedRejectedExecutionHandler());
        this.eventHandlingThread = new Thread() { // from class: org.apache.tez.dag.app.launcher.ContainerLauncherImpl.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                int numNodes;
                int min;
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        NMCommunicatorEvent take = ContainerLauncherImpl.this.eventQueue.take();
                        int corePoolSize = ContainerLauncherImpl.this.launcherPool.getCorePoolSize();
                        if (corePoolSize != ContainerLauncherImpl.this.limitOnPoolSize && corePoolSize < (min = Math.min(ContainerLauncherImpl.this.limitOnPoolSize, (numNodes = ContainerLauncherImpl.this.context.getNodeTracker().getNumNodes())))) {
                            int min2 = Math.min(ContainerLauncherImpl.this.limitOnPoolSize, min + ContainerLauncherImpl.INITIAL_POOL_SIZE);
                            ContainerLauncherImpl.LOG.info("Setting ContainerLauncher pool size to " + min2 + " as number-of-nodes to talk to is " + numNodes);
                            ContainerLauncherImpl.this.launcherPool.setCorePoolSize(min2);
                        }
                        ContainerLauncherImpl.this.launcherPool.execute(ContainerLauncherImpl.this.createEventProcessor(take));
                    } catch (InterruptedException e) {
                        if (ContainerLauncherImpl.this.serviceStopped.get()) {
                            return;
                        }
                        ContainerLauncherImpl.LOG.error("Returning, interrupted : " + e);
                        return;
                    }
                }
            }
        };
        this.eventHandlingThread.setName("ContainerLauncher Event Handler");
        this.eventHandlingThread.start();
    }

    private void shutdownAllContainers() {
        for (Container container : this.containers.values()) {
            if (container != null) {
                container.kill();
            }
        }
    }

    public void serviceStop() {
        if (!this.serviceStopped.compareAndSet(false, true)) {
            LOG.info("Ignoring multiple stops");
            return;
        }
        shutdownAllContainers();
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        if (this.launcherPool != null) {
            this.launcherPool.shutdownNow();
        }
    }

    protected EventProcessor createEventProcessor(NMCommunicatorEvent nMCommunicatorEvent) {
        return new EventProcessor(nMCommunicatorEvent);
    }

    protected ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData getCMProxy(ContainerId containerId, String str, Token token) throws IOException {
        return this.cmProxy.getProxy(str, containerId);
    }

    void sendContainerLaunchFailedMsg(ContainerId containerId, String str) {
        LOG.error(str);
        this.context.getEventHandler().handle(new AMContainerEventLaunchFailed(containerId, str));
    }

    public void handle(NMCommunicatorEvent nMCommunicatorEvent) {
        try {
            this.eventQueue.put(nMCommunicatorEvent);
        } catch (InterruptedException e) {
            throw new TezUncheckedException(e);
        }
    }
}
