package org.apache.hadoop.hive.llap.shufflehandler;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.cache.Weigher;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.ssl.SslHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.class */
public class ShuffleHandler implements AttemptRegistrationListener {
    public static final String SHUFFLE_HANDLER_LOCAL_DIRS = "llap.shuffle.handler.local-dirs";
    public static final String SHUFFLE_MANAGE_OS_CACHE = "lla[.shuffle.manage.os.cache";
    public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
    public static final String SHUFFLE_READAHEAD_BYTES = "llap.shuffle.readahead.bytes";
    public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4194304;
    public static final String SHUFFLE_DIR_WATCHER_ENABLED = "llap.shuffle.dir-watcher.enabled";
    public static final boolean SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false;
    private int port;
    private final ChannelFactory selector;
    protected HttpPipelineFactory pipelineFact;
    private final int sslFileBufferSize;
    private final Configuration conf;
    private final String[] localDirs;
    private final DirWatcher dirWatcher;
    private final boolean manageOsCache;
    private final int readaheadLength;
    private final int maxShuffleConnections;
    private final int shuffleBufferSize;
    private final boolean shuffleTransferToAllowed;
    private final ConcurrentMap<String, String> userRsrc;
    private JobTokenSecretManager secretManager;
    public static final String SHUFFLE_PORT_CONFIG_KEY = "llap.shuffle.port";
    public static final int DEFAULT_SHUFFLE_PORT = 15551;
    public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = "llap.shuffle.connection-keep-alive.enable";
    public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = false;
    public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = "llap.shuffle.connection-keep-alive.timeout";
    public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5;
    public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = "llap.shuffle.mapoutput-info.meta.cache.size";
    public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = 10000;
    public static final String CONNECTION_CLOSE = "close";
    public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = "llap.shuffle.ssl.file.buffer.size";
    public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 61440;
    public static final String MAX_SHUFFLE_CONNECTIONS = "llap.shuffle.max.connections";
    public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0;
    public static final String MAX_SHUFFLE_THREADS = "llap.shuffle.max.threads";
    public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
    public static final String SHUFFLE_BUFFER_SIZE = "llap.shuffle.transfer.buffer.size";
    public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 131072;
    public static final String SHUFFLE_TRANSFERTO_ALLOWED = "llap.shuffle.transferTo.allowed";
    public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
    public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = false;
    static final String DATA_FILE_NAME = "file.out";
    static final String INDEX_FILE_NAME = "file.out.index";
    private static ShuffleHandler INSTANCE;
    final boolean connectionKeepAliveEnabled;
    final int connectionKeepAliveTimeOut;
    final int mapOutputMetaInfoCacheSize;
    private final Shuffle shuffle;
    private static final String USERCACHE_CONSTANT = "usercache";
    private static final String APPCACHE_CONSTANT = "appcache";
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleHandler.class);
    private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile("^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$", 2);
    private static final AtomicBoolean started = new AtomicBoolean(false);
    private static final AtomicBoolean initing = new AtomicBoolean(false);
    private final ChannelGroup accepted = new DefaultChannelGroup();
    private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
    private final ConcurrentMap<String, Integer> registeredApps = new ConcurrentHashMap();
    private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(SHUFFLE_HANDLER_LOCAL_DIRS);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler$AttemptPathIdentifier.class */
    public static class AttemptPathIdentifier {
        private final String jobId;
        private final int dagId;
        private final String user;
        private final String attemptId;

        public AttemptPathIdentifier(String str, int i, String str2, String str3) {
            this.jobId = str;
            this.dagId = i;
            this.user = str2;
            this.attemptId = str3;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            AttemptPathIdentifier attemptPathIdentifier = (AttemptPathIdentifier) obj;
            if (this.dagId == attemptPathIdentifier.dagId && this.jobId.equals(attemptPathIdentifier.jobId)) {
                return this.attemptId.equals(attemptPathIdentifier.attemptId);
            }
            return false;
        }

        public int hashCode() {
            return (31 * ((31 * this.jobId.hashCode()) + this.dagId)) + this.attemptId.hashCode();
        }

        public String toString() {
            return "AttemptPathIdentifier{jobId='" + this.jobId + "', dagId=" + this.dagId + ", user='" + this.user + "', attemptId='" + this.attemptId + "'}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler$AttemptPathInfo.class */
    public static class AttemptPathInfo {
        private final Path indexPath;
        private final Path dataPath;

        public AttemptPathInfo(Path path, Path path2) {
            this.indexPath = path;
            this.dataPath = path2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler$HttpPipelineFactory.class */
    public class HttpPipelineFactory implements ChannelPipelineFactory {
        final Shuffle SHUFFLE;
        private SSLFactory sslFactory;

        public HttpPipelineFactory(Configuration configuration) throws Exception {
            this.SHUFFLE = ShuffleHandler.this.getShuffle(configuration);
        }

        public void destroy() {
            if (this.sslFactory != null) {
                this.sslFactory.destroy();
            }
        }

        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = Channels.pipeline();
            if (this.sslFactory != null) {
                pipeline.addLast("ssl", new SslHandler(this.sslFactory.createSSLEngine()));
            }
            pipeline.addLast("decoder", new HttpRequestDecoder());
            pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
            pipeline.addLast("encoder", new HttpResponseEncoder());
            pipeline.addLast("chunking", new ChunkedWriteHandler());
            pipeline.addLast("shuffle", this.SHUFFLE);
            return pipeline;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler$Shuffle.class */
    public class Shuffle extends SimpleChannelUpstreamHandler {
        private final Configuration conf;
        private final IndexCache indexCache;
        private int port;
        private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache = CacheBuilder.newBuilder().expireAfterAccess(300, TimeUnit.SECONDS).softValues().concurrencyLevel(16).removalListener(new RemovalListener<AttemptPathIdentifier, AttemptPathInfo>() { // from class: org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler.Shuffle.3
            public void onRemoval(RemovalNotification<AttemptPathIdentifier, AttemptPathInfo> removalNotification) {
                ShuffleHandler.LOG.debug("PathCacheEviction: " + removalNotification.getKey() + ", Reason=" + removalNotification.getCause());
            }
        }).maximumWeight(10485760).weigher(new Weigher<AttemptPathIdentifier, AttemptPathInfo>() { // from class: org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler.Shuffle.2
            public int weigh(AttemptPathIdentifier attemptPathIdentifier, AttemptPathInfo attemptPathInfo) {
                return attemptPathIdentifier.jobId.length() + attemptPathIdentifier.user.length() + attemptPathIdentifier.attemptId.length() + attemptPathInfo.indexPath.toString().length() + attemptPathInfo.dataPath.toString().length();
            }
        }).build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>() { // from class: org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler.Shuffle.1
            public AttemptPathInfo load(AttemptPathIdentifier attemptPathIdentifier) throws Exception {
                String str = ShuffleHandler.getBaseLocation(attemptPathIdentifier.jobId, attemptPathIdentifier.dagId, attemptPathIdentifier.user) + attemptPathIdentifier.attemptId;
                Path localPathToRead = ShuffleHandler.this.lDirAlloc.getLocalPathToRead(str + "/" + ShuffleHandler.INDEX_FILE_NAME, Shuffle.this.conf);
                Path localPathToRead2 = ShuffleHandler.this.lDirAlloc.getLocalPathToRead(str + "/" + ShuffleHandler.DATA_FILE_NAME, Shuffle.this.conf);
                ShuffleHandler.LOG.debug("Loaded : " + attemptPathIdentifier + " via loader");
                if (ShuffleHandler.this.dirWatcher != null) {
                    ShuffleHandler.this.dirWatcher.attemptInfoFound(attemptPathIdentifier);
                }
                return new AttemptPathInfo(localPathToRead, localPathToRead2);
            }
        });

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler$Shuffle$MapOutputInfo.class */
        public class MapOutputInfo {
            final Path mapOutputFileName;
            final TezIndexRecord indexRecord;

            MapOutputInfo(Path path, TezIndexRecord tezIndexRecord) {
                this.mapOutputFileName = path;
                this.indexRecord = tezIndexRecord;
            }
        }

        public Shuffle(Configuration configuration) {
            this.conf = configuration;
            this.indexCache = new IndexCache(configuration);
            this.port = configuration.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
        }

        public void setPort(int i) {
            this.port = i;
        }

        void registerAttemptDirs(AttemptPathIdentifier attemptPathIdentifier, AttemptPathInfo attemptPathInfo) {
            ShuffleHandler.LOG.debug("Registering " + attemptPathIdentifier + " via watcher");
            this.pathCache.put(attemptPathIdentifier, attemptPathInfo);
        }

        private List<String> splitMaps(List<String> list) {
            if (null == list) {
                return null;
            }
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                Collections.addAll(arrayList, it.next().split(","));
            }
            return arrayList;
        }

        public void channelOpen(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
            if (ShuffleHandler.this.maxShuffleConnections <= 0 || ShuffleHandler.this.accepted.size() < ShuffleHandler.this.maxShuffleConnections) {
                ShuffleHandler.this.accepted.add(channelStateEvent.getChannel());
                super.channelOpen(channelHandlerContext, channelStateEvent);
            } else {
                ShuffleHandler.LOG.info(String.format("Current number of shuffle connections (%d) is greater than or equal to the max allowed shuffle connections (%d)", Integer.valueOf(ShuffleHandler.this.accepted.size()), Integer.valueOf(ShuffleHandler.this.maxShuffleConnections)));
                channelStateEvent.getChannel().close();
            }
        }

        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
            HttpRequest httpRequest = (HttpRequest) messageEvent.getMessage();
            if (httpRequest.getMethod() != HttpMethod.GET) {
                sendError(channelHandlerContext, HttpResponseStatus.METHOD_NOT_ALLOWED);
                return;
            }
            if (!"mapreduce".equals(httpRequest.headers().get(LlapOptionsProcessor.OPTION_NAME)) || !"1.0.0".equals(httpRequest.headers().get("version"))) {
                sendError(channelHandlerContext, "Incompatible shuffle request version", HttpResponseStatus.BAD_REQUEST);
            }
            Map parameters = new QueryStringDecoder(httpRequest.getUri()).getParameters();
            List list = (List) parameters.get("keepAlive");
            boolean z = false;
            if (list != null && list.size() == 1) {
                z = Boolean.parseBoolean((String) list.get(0));
                if (ShuffleHandler.LOG.isDebugEnabled()) {
                    ShuffleHandler.LOG.debug("KeepAliveParam : " + list + " : " + z);
                }
            }
            List<String> splitMaps = splitMaps((List) parameters.get("map"));
            List list2 = (List) parameters.get("reduce");
            List list3 = (List) parameters.get("job");
            List list4 = (List) parameters.get("dag");
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                ShuffleHandler.LOG.debug("RECV: " + httpRequest.getUri() + "\n  mapId: " + splitMaps + "\n  reduceId: " + list2 + "\n  jobId: " + list3 + "\n  dagId: " + list4 + "\n  keepAlive: " + z);
            }
            if (splitMaps != null && list2 != null) {
                if (!((list3 == null) | (list4 == null))) {
                    if (list2.size() != 1 || list3.size() != 1 || list4.size() != 1) {
                        sendError(channelHandlerContext, "Too many job/reduce parameters", HttpResponseStatus.BAD_REQUEST);
                        return;
                    }
                    try {
                        int parseInt = Integer.parseInt((String) list2.get(0));
                        String str = (String) list3.get(0);
                        int parseInt2 = Integer.parseInt((String) list4.get(0));
                        String uri = httpRequest.getUri();
                        if (null == uri) {
                            sendError(channelHandlerContext, HttpResponseStatus.FORBIDDEN);
                            return;
                        }
                        DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                        try {
                            verifyRequest(str, channelHandlerContext, httpRequest, defaultHttpResponse, new URL("http", "", this.port, uri));
                            HashMap hashMap = new HashMap();
                            Channel channel = messageEvent.getChannel();
                            String str2 = (String) ShuffleHandler.this.userRsrc.get(str);
                            try {
                                populateHeaders(splitMaps, str, parseInt2, str2, parseInt, defaultHttpResponse, z, hashMap);
                                channel.write(defaultHttpResponse);
                                ChannelFuture channelFuture = null;
                                for (String str3 : splitMaps) {
                                    try {
                                        MapOutputInfo mapOutputInfo = hashMap.get(str3);
                                        if (mapOutputInfo == null) {
                                            mapOutputInfo = getMapOutputInfo(str, parseInt2, str3, parseInt, str2);
                                        }
                                        channelFuture = sendMapOutput(channelHandlerContext, channel, str2, str3, parseInt, mapOutputInfo);
                                        if (null == channelFuture) {
                                            sendError(channelHandlerContext, HttpResponseStatus.NOT_FOUND);
                                            return;
                                        }
                                    } catch (IOException e) {
                                        ShuffleHandler.LOG.error("Shuffle error :", e);
                                        sendError(channelHandlerContext, getErrorMessage(e), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                                        return;
                                    }
                                }
                                channelFuture.addListener(ChannelFutureListener.CLOSE);
                                return;
                            } catch (IOException e2) {
                                channel.write(defaultHttpResponse);
                                ShuffleHandler.LOG.error("Shuffle error in populating headers :", e2);
                                sendError(channelHandlerContext, getErrorMessage(e2), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                                return;
                            }
                        } catch (IOException e3) {
                            ShuffleHandler.LOG.warn("Shuffle failure ", e3);
                            sendError(channelHandlerContext, e3.getMessage(), HttpResponseStatus.UNAUTHORIZED);
                            return;
                        }
                    } catch (NumberFormatException e4) {
                        sendError(channelHandlerContext, "Bad reduce parameter", HttpResponseStatus.BAD_REQUEST);
                        return;
                    } catch (IllegalArgumentException e5) {
                        sendError(channelHandlerContext, "Bad job parameter", HttpResponseStatus.BAD_REQUEST);
                        return;
                    }
                }
            }
            sendError(channelHandlerContext, "Required param job, map and reduce", HttpResponseStatus.BAD_REQUEST);
        }

        private String getErrorMessage(Throwable th) {
            StringBuffer stringBuffer = new StringBuffer(th.getMessage());
            while (th.getCause() != null) {
                stringBuffer.append(th.getCause().getMessage());
                th = th.getCause();
            }
            return stringBuffer.toString();
        }

        protected MapOutputInfo getMapOutputInfo(String str, int i, String str2, int i2, String str3) throws IOException {
            try {
                AttemptPathIdentifier attemptPathIdentifier = new AttemptPathIdentifier(str, i, str3, str2);
                AttemptPathInfo attemptPathInfo = (AttemptPathInfo) this.pathCache.get(attemptPathIdentifier);
                if (ShuffleHandler.LOG.isDebugEnabled()) {
                    ShuffleHandler.LOG.debug("Retrieved pathInfo for " + attemptPathIdentifier + " check for corresponding loaded messages to determine whether it was loaded or cached");
                }
                TezIndexRecord indexInformation = this.indexCache.getIndexInformation(str2, i2, attemptPathInfo.indexPath, str3);
                if (ShuffleHandler.LOG.isDebugEnabled()) {
                    ShuffleHandler.LOG.debug("jobId=" + str + ", mapId=" + str2 + ",dataFile=" + attemptPathInfo.dataPath + ", indexFile=" + attemptPathInfo.indexPath);
                }
                return new MapOutputInfo(attemptPathInfo.dataPath, indexInformation);
            } catch (ExecutionException e) {
                if (e.getCause() instanceof IOException) {
                    throw ((IOException) e.getCause());
                }
                throw new RuntimeException(e.getCause());
            }
        }

        protected void populateHeaders(List<String> list, String str, int i, String str2, int i2, HttpResponse httpResponse, boolean z, Map<String, MapOutputInfo> map) throws IOException {
            long j = 0;
            for (String str3 : list) {
                MapOutputInfo mapOutputInfo = getMapOutputInfo(str, i, str3, i2, str2);
                if (map.size() < ShuffleHandler.this.mapOutputMetaInfoCacheSize) {
                    map.put(str3, mapOutputInfo);
                }
                new ShuffleHeader(str3, mapOutputInfo.indexRecord.getPartLength(), mapOutputInfo.indexRecord.getRawLength(), i2).write(new DataOutputBuffer());
                j = j + mapOutputInfo.indexRecord.getPartLength() + r0.getLength();
            }
            setResponseHeaders(httpResponse, z, j);
        }

        protected void setResponseHeaders(HttpResponse httpResponse, boolean z, long j) {
            if (!ShuffleHandler.this.connectionKeepAliveEnabled && !z) {
                ShuffleHandler.LOG.info("Setting connection close header...");
                httpResponse.headers().add("Connection", ShuffleHandler.CONNECTION_CLOSE);
            } else {
                httpResponse.headers().add("Content-Length", String.valueOf(j));
                httpResponse.headers().add("Connection", "keep-alive");
                httpResponse.headers().add("keep-alive", "timeout=" + ShuffleHandler.this.connectionKeepAliveTimeOut);
                ShuffleHandler.LOG.info("Content Length in shuffle : " + j);
            }
        }

        protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
            SecretKey retrieveTokenSecret = ShuffleHandler.this.secretManager.retrieveTokenSecret(str);
            if (null == retrieveTokenSecret) {
                ShuffleHandler.LOG.info("Request for unknown token " + str);
                throw new IOException("could not find jobid");
            }
            String buildMsgFrom = SecureShuffleUtils.buildMsgFrom(url);
            String str2 = httpRequest.headers().get("UrlHash");
            if (str2 == null) {
                ShuffleHandler.LOG.info("Missing header hash for " + str);
                throw new IOException("fetcher cannot be authenticated");
            }
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                int length = str2.length();
                ShuffleHandler.LOG.debug("verifying request. enc_str=" + buildMsgFrom + "; hash=..." + str2.substring(length - (length / 2), length - 1));
            }
            SecureShuffleUtils.verifyReply(str2, buildMsgFrom, retrieveTokenSecret);
            String generateHash = SecureShuffleUtils.generateHash(str2.getBytes(Charsets.UTF_8), retrieveTokenSecret);
            httpResponse.headers().add("ReplyHash", generateHash);
            httpResponse.headers().add(LlapOptionsProcessor.OPTION_NAME, "mapreduce");
            httpResponse.headers().add("version", "1.0.0");
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                int length2 = generateHash.length();
                ShuffleHandler.LOG.debug("Fetcher request verfied. enc_str=" + buildMsgFrom + ";reply=" + generateHash.substring(length2 - (length2 / 2), length2 - 1));
            }
        }

        protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, int i, MapOutputInfo mapOutputInfo) throws IOException {
            ChannelFuture write;
            TezIndexRecord tezIndexRecord = mapOutputInfo.indexRecord;
            ShuffleHeader shuffleHeader = new ShuffleHeader(str2, tezIndexRecord.getPartLength(), tezIndexRecord.getRawLength(), i);
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            shuffleHeader.write(dataOutputBuffer);
            channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
            File file = new File(mapOutputInfo.mapOutputFileName.toString());
            try {
                RandomAccessFile openForRandomRead = SecureIOUtils.openForRandomRead(file, "r", str, (String) null);
                if (channel.getPipeline().get(SslHandler.class) == null) {
                    final FadvisedFileRegion fadvisedFileRegion = new FadvisedFileRegion(openForRandomRead, tezIndexRecord.getStartOffset(), tezIndexRecord.getPartLength(), ShuffleHandler.this.manageOsCache, ShuffleHandler.this.readaheadLength, ShuffleHandler.this.readaheadPool, file.getAbsolutePath(), ShuffleHandler.this.shuffleBufferSize, ShuffleHandler.this.shuffleTransferToAllowed);
                    write = channel.write(fadvisedFileRegion);
                    write.addListener(new ChannelFutureListener() { // from class: org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler.Shuffle.4
                        public void operationComplete(ChannelFuture channelFuture) {
                            if (channelFuture.isSuccess()) {
                                fadvisedFileRegion.transferSuccessful();
                            }
                            fadvisedFileRegion.releaseExternalResources();
                        }
                    });
                } else {
                    write = channel.write(new FadvisedChunkedFile(openForRandomRead, tezIndexRecord.getStartOffset(), tezIndexRecord.getPartLength(), ShuffleHandler.this.sslFileBufferSize, ShuffleHandler.this.manageOsCache, ShuffleHandler.this.readaheadLength, ShuffleHandler.this.readaheadPool, file.getAbsolutePath()));
                }
                return write;
            } catch (FileNotFoundException e) {
                ShuffleHandler.LOG.info(file + " not found");
                return null;
            }
        }

        protected void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
            sendError(channelHandlerContext, "", httpResponseStatus);
        }

        protected void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
            DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus);
            defaultHttpResponse.headers().add("Content-Type", "text/plain; charset=UTF-8");
            defaultHttpResponse.headers().add(LlapOptionsProcessor.OPTION_NAME, "mapreduce");
            defaultHttpResponse.headers().add("version", "1.0.0");
            defaultHttpResponse.setContent(ChannelBuffers.copiedBuffer(str, CharsetUtil.UTF_8));
            channelHandlerContext.getChannel().write(defaultHttpResponse).addListener(ChannelFutureListener.CLOSE);
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
            Channel channel = exceptionEvent.getChannel();
            Throwable cause = exceptionEvent.getCause();
            if (cause instanceof TooLongFrameException) {
                sendError(channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if (cause instanceof IOException) {
                if (cause instanceof ClosedChannelException) {
                    ShuffleHandler.LOG.debug("Ignoring closed channel error", cause);
                    return;
                }
                if (ShuffleHandler.IGNORABLE_ERROR_MESSAGE.matcher(String.valueOf(cause.getMessage())).matches()) {
                    ShuffleHandler.LOG.debug("Ignoring client socket close", cause);
                    return;
                }
            }
            ShuffleHandler.LOG.error("Shuffle error: ", cause);
            if (channel.isConnected()) {
                ShuffleHandler.LOG.error("Shuffle error " + exceptionEvent);
                sendError(channelHandlerContext, HttpResponseStatus.INTERNAL_SERVER_ERROR);
            }
        }
    }

    @Metrics(about = "Shuffle output metrics", context = "mapred")
    /* loaded from: input_file:org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler$ShuffleMetrics.class */
    static class ShuffleMetrics implements ChannelFutureListener {

        @Metric({"Shuffle output in bytes"})
        MutableCounterLong shuffleOutputBytes;

        @Metric({"# of failed shuffle outputs"})
        MutableCounterInt shuffleOutputsFailed;

        @Metric({"# of succeeeded shuffle outputs"})
        MutableCounterInt shuffleOutputsOK;

        @Metric({"# of current shuffle connections"})
        MutableGaugeInt shuffleConnections;

        ShuffleMetrics() {
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                this.shuffleOutputsOK.incr();
            } else {
                this.shuffleOutputsFailed.incr();
            }
            this.shuffleConnections.decr();
        }
    }

    @Override // org.apache.hadoop.hive.llap.shufflehandler.AttemptRegistrationListener
    public void registerAttemptDirs(AttemptPathIdentifier attemptPathIdentifier, AttemptPathInfo attemptPathInfo) {
        this.shuffle.registerAttemptDirs(attemptPathIdentifier, attemptPathInfo);
    }

    private ShuffleHandler(Configuration configuration) {
        this.conf = configuration;
        this.manageOsCache = configuration.getBoolean(SHUFFLE_MANAGE_OS_CACHE, true);
        this.readaheadLength = configuration.getInt(SHUFFLE_READAHEAD_BYTES, DEFAULT_SHUFFLE_READAHEAD_BYTES);
        this.maxShuffleConnections = configuration.getInt(MAX_SHUFFLE_CONNECTIONS, 0);
        int i = configuration.getInt(MAX_SHUFFLE_THREADS, 0);
        i = i == 0 ? 2 * Runtime.getRuntime().availableProcessors() : i;
        this.localDirs = configuration.getTrimmedStrings(SHUFFLE_HANDLER_LOCAL_DIRS);
        this.shuffleBufferSize = configuration.getInt(SHUFFLE_BUFFER_SIZE, DEFAULT_SHUFFLE_BUFFER_SIZE);
        this.shuffleTransferToAllowed = configuration.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED, !Shell.WINDOWS);
        this.selector = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("ShuffleHandler Netty Boss #%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("ShuffleHandler Netty Worker #%d").build()), i);
        this.sslFileBufferSize = configuration.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
        this.connectionKeepAliveEnabled = configuration.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, false);
        this.connectionKeepAliveTimeOut = Math.max(1, configuration.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, 5));
        this.mapOutputMetaInfoCacheSize = Math.max(1, configuration.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE, DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
        this.userRsrc = new ConcurrentHashMap();
        this.secretManager = new JobTokenSecretManager();
        this.shuffle = new Shuffle(configuration);
        if (!configuration.getBoolean(SHUFFLE_DIR_WATCHER_ENABLED, false)) {
            LOG.info("DirWatcher disabled by config");
            this.dirWatcher = null;
            return;
        }
        LOG.info("Attempting to start dirWatcher");
        DirWatcher dirWatcher = null;
        try {
            dirWatcher = new DirWatcher(this);
        } catch (IOException e) {
            LOG.warn("Unable to start DirWatcher. Active scans disabled");
        }
        this.dirWatcher = dirWatcher;
    }

    public void start() throws Exception {
        ServerBootstrap serverBootstrap = new ServerBootstrap(this.selector);
        try {
            this.pipelineFact = new HttpPipelineFactory(this.conf);
            serverBootstrap.setPipelineFactory(this.pipelineFact);
            this.port = this.conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
            Channel bind = serverBootstrap.bind(new InetSocketAddress(this.port));
            this.accepted.add(bind);
            this.port = ((InetSocketAddress) bind.getLocalAddress()).getPort();
            this.conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(this.port));
            this.pipelineFact.SHUFFLE.setPort(this.port);
            if (this.dirWatcher != null) {
                this.dirWatcher.start();
            }
            LOG.info("LlapShuffleHandler listening on port " + this.port);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void initializeAndStart(Configuration configuration) throws Exception {
        if (initing.getAndSet(true)) {
            return;
        }
        INSTANCE = new ShuffleHandler(configuration);
        INSTANCE.start();
        started.set(true);
    }

    public static void shutdown() throws Exception {
        if (INSTANCE != null) {
            INSTANCE.stop();
        }
    }

    public static ShuffleHandler get() {
        Preconditions.checkState(started.get(), "ShuffleHandler must be started before invoking get");
        return INSTANCE;
    }

    public static ByteBuffer serializeMetaData(int i) throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        dataOutputBuffer.writeInt(i);
        return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
    }

    public static int deserializeMetaData(ByteBuffer byteBuffer) throws IOException {
        DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
        dataInputByteBuffer.reset(new ByteBuffer[]{byteBuffer});
        return dataInputByteBuffer.readInt();
    }

    public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> token) throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        token.write(dataOutputBuffer);
        return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
    }

    static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer byteBuffer) throws IOException {
        DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
        dataInputByteBuffer.reset(new ByteBuffer[]{byteBuffer});
        Token<JobTokenIdentifier> token = new Token<>();
        token.readFields(dataInputByteBuffer);
        return token;
    }

    public int getPort() {
        return this.port;
    }

    public void registerDag(String str, int i, Token<JobTokenIdentifier> token, String str2, String[] strArr) {
        Integer putIfAbsent = this.registeredApps.putIfAbsent(str, Integer.valueOf(i));
        if (putIfAbsent == null) {
            recordJobShuffleInfo(str, str2, token);
        }
        if (putIfAbsent != null && !putIfAbsent.equals(Integer.valueOf(i))) {
            this.registeredApps.put(str, Integer.valueOf(i));
        }
        if ((putIfAbsent == null || !putIfAbsent.equals(Integer.valueOf(i))) && this.dirWatcher != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Registering watches for AppDirs: appId={}, dagId={}", str, Integer.valueOf(i));
            }
            for (String str3 : strArr) {
                try {
                    this.dirWatcher.registerDagDir(str3, str, i, str2, 300000L);
                } catch (IOException e) {
                    LOG.warn("Unable to register dir: " + str3 + " with watcher");
                }
            }
        }
    }

    public void unregisterDag(String str, String str2, int i) {
        Integer num = this.registeredApps.get(str2);
        if (num != null && num.equals(Integer.valueOf(i))) {
            this.registeredApps.remove(str2);
            removeJobShuffleInfo(str2);
        }
        if (this.dirWatcher != null) {
            this.dirWatcher.unregisterDagDir(str, str2, i);
        }
    }

    protected void stop() throws Exception {
        this.accepted.close().awaitUninterruptibly(10L, TimeUnit.SECONDS);
        if (this.selector != null) {
            new ServerBootstrap(this.selector).releaseExternalResources();
        }
        if (this.pipelineFact != null) {
            this.pipelineFact.destroy();
        }
        if (this.dirWatcher != null) {
            this.dirWatcher.stop();
        }
    }

    protected Shuffle getShuffle(Configuration configuration) {
        return this.shuffle;
    }

    private void addJobToken(String str, String str2, Token<JobTokenIdentifier> token) {
        String replace = str.replace("application", "job");
        this.userRsrc.putIfAbsent(replace, str2);
        this.secretManager.addTokenForJob(replace, token);
        LOG.info("Added token for " + replace);
    }

    private void recordJobShuffleInfo(String str, String str2, Token<JobTokenIdentifier> token) {
        addJobToken(str, str2, token);
    }

    private void removeJobShuffleInfo(String str) {
        this.secretManager.removeTokenForJob(str);
        this.userRsrc.remove(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getBaseLocation(String str, int i, String str2) {
        String[] split = str.split("_");
        Preconditions.checkArgument(split.length == 3, "Invalid jobId. Expecting 3 parts");
        return "usercache/" + str2 + "/" + APPCACHE_CONSTANT + "/" + ConverterUtils.toString(ApplicationId.newInstance(Long.parseLong(split[1]), Integer.parseInt(split[2]))) + "/" + i + "/output/";
    }
}
