package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher.class */
public class Fetcher extends CallableWithNdc<FetchResult> {
    private static final Logger LOG;
    private static final AtomicInteger fetcherIdGen;
    private final Configuration conf;
    private final int shufflePort;
    private CompressionCodec codec;
    private boolean ifileReadAhead;
    private int ifileReadAheadLength;
    private final JobTokenSecretManager jobTokenSecretMgr;
    private final FetcherCallback fetcherCallback;
    private final FetchedInputAllocator inputManager;
    private final ApplicationId appId;
    private final String logIdentifier;
    private final String localHostname;
    private final AtomicBoolean isShutDown;
    private final int fetcherIdentifier;
    private List<InputAttemptIdentifier> srcAttempts;
    private String host;
    private int port;
    private int partition;
    private final Map<String, InputAttemptIdentifier> pathToAttemptMap;
    private List<InputAttemptIdentifier> remaining;
    private URL url;
    private volatile DataInputStream input;
    private HttpConnection httpConnection;
    private HttpConnection.HttpConnectionParams httpConnectionParams;
    private final boolean localDiskFetchEnabled;
    private final boolean sharedFetchEnabled;
    private final LocalDirAllocator localDirAllocator;
    private final Path lockPath;
    private final RawLocalFileSystem localFs;
    private long retryStartTime;
    private final boolean isDebugEnabled;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher$CachingCallBack.class */
    public final class CachingCallBack {
        private CachingCallBack() {
        }

        public void cache(String str, InputAttemptIdentifier inputAttemptIdentifier, FetchedInput fetchedInput, long j, long j2) {
            try {
                Preconditions.checkArgument(Fetcher.this.partition == 0, "Partition == 0");
                String str2 = TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT + System.currentTimeMillis() + ".tmp";
                Path localPathForWrite = Fetcher.this.localDirAllocator.getLocalPathForWrite(Fetcher.getMapOutputFile(inputAttemptIdentifier.getPathComponent()), j, Fetcher.this.conf);
                TezSpillRecord tezSpillRecord = new TezSpillRecord(1);
                Path suffix = localPathForWrite.suffix(".index" + str2);
                if (Fetcher.this.localFs.exists(suffix)) {
                    Fetcher.LOG.warn("Found duplicate instance of input index file " + suffix);
                    return;
                }
                switch (fetchedInput.getType()) {
                    case DISK:
                        TezIndexRecord tezIndexRecord = new TezIndexRecord(0L, j2, j);
                        Fetcher.this.localFs.mkdirs(localPathForWrite.getParent());
                        Path suffix2 = localPathForWrite.suffix(str2);
                        Fetcher.this.localFs.copyFromLocalFile(((DiskFetchedInput) fetchedInput).getInputPath(), suffix2);
                        if (!Fetcher.this.localFs.rename(suffix2, localPathForWrite)) {
                            Fetcher.LOG.warn("Could not rename to cached file name " + localPathForWrite);
                            Fetcher.this.localFs.delete(suffix2, false);
                            return;
                        }
                        tezSpillRecord.putIndex(tezIndexRecord, 0);
                        tezSpillRecord.writeToFile(suffix, Fetcher.this.conf);
                        if (Fetcher.this.localFs.rename(suffix, localPathForWrite.suffix(".index"))) {
                            return;
                        }
                        Fetcher.this.localFs.delete(suffix, false);
                        Fetcher.this.localFs.delete(localPathForWrite, false);
                        Fetcher.LOG.warn("Could not rename the index file to " + localPathForWrite.suffix(".index"));
                        return;
                    default:
                        Fetcher.LOG.warn("Incorrect use of CachingCallback for " + inputAttemptIdentifier);
                        return;
                }
            } catch (IOException e) {
                Fetcher.LOG.warn("Cache threw an error " + e);
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher$FetcherBuilder.class */
    public static class FetcherBuilder {
        private Fetcher fetcher;
        private boolean workAssigned = false;

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnection.HttpConnectionParams httpConnectionParams, FetchedInputAllocator fetchedInputAllocator, ApplicationId applicationId, JobTokenSecretManager jobTokenSecretManager, String str, Configuration configuration, boolean z, String str2, int i) {
            this.fetcher = new Fetcher(fetcherCallback, httpConnectionParams, fetchedInputAllocator, applicationId, jobTokenSecretManager, str, configuration, null, null, null, z, false, str2, i);
        }

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnection.HttpConnectionParams httpConnectionParams, FetchedInputAllocator fetchedInputAllocator, ApplicationId applicationId, JobTokenSecretManager jobTokenSecretManager, String str, Configuration configuration, RawLocalFileSystem rawLocalFileSystem, LocalDirAllocator localDirAllocator, Path path, boolean z, boolean z2, String str2, int i) {
            this.fetcher = new Fetcher(fetcherCallback, httpConnectionParams, fetchedInputAllocator, applicationId, jobTokenSecretManager, str, configuration, rawLocalFileSystem, localDirAllocator, path, z, z2, str2, i);
        }

        public FetcherBuilder setHttpConnectionParameters(HttpConnection.HttpConnectionParams httpConnectionParams) {
            this.fetcher.httpConnectionParams = httpConnectionParams;
            return this;
        }

        public FetcherBuilder setCompressionParameters(CompressionCodec compressionCodec) {
            this.fetcher.codec = compressionCodec;
            return this;
        }

        public FetcherBuilder setIFileParams(boolean z, int i) {
            this.fetcher.ifileReadAhead = z;
            this.fetcher.ifileReadAheadLength = i;
            return this;
        }

        public FetcherBuilder assignWork(String str, int i, int i2, List<InputAttemptIdentifier> list) {
            this.fetcher.host = str;
            this.fetcher.port = i;
            this.fetcher.partition = i2;
            this.fetcher.srcAttempts = list;
            this.workAssigned = true;
            return this;
        }

        public Fetcher build() {
            Preconditions.checkState(this.workAssigned, "Cannot build a fetcher withot assigning work to it");
            return this.fetcher;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher$HostFetchResult.class */
    public static class HostFetchResult {
        private final FetchResult fetchResult;
        private final InputAttemptIdentifier[] failedInputs;
        private final boolean connectFailed;

        public HostFetchResult(FetchResult fetchResult, InputAttemptIdentifier[] inputAttemptIdentifierArr, boolean z) {
            this.fetchResult = fetchResult;
            this.failedInputs = inputAttemptIdentifierArr;
            this.connectFailed = z;
        }
    }

    private Fetcher(FetcherCallback fetcherCallback, HttpConnection.HttpConnectionParams httpConnectionParams, FetchedInputAllocator fetchedInputAllocator, ApplicationId applicationId, JobTokenSecretManager jobTokenSecretManager, String str, Configuration configuration, RawLocalFileSystem rawLocalFileSystem, LocalDirAllocator localDirAllocator, Path path, boolean z, boolean z2, String str2, int i) {
        this.ifileReadAhead = true;
        this.ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
        this.isShutDown = new AtomicBoolean(false);
        this.retryStartTime = 0L;
        this.isDebugEnabled = LOG.isDebugEnabled();
        this.fetcherCallback = fetcherCallback;
        this.inputManager = fetchedInputAllocator;
        this.jobTokenSecretMgr = jobTokenSecretManager;
        this.appId = applicationId;
        this.pathToAttemptMap = new HashMap();
        this.httpConnectionParams = httpConnectionParams;
        this.conf = configuration;
        this.localDiskFetchEnabled = z;
        this.sharedFetchEnabled = z2;
        this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
        this.logIdentifier = " fetcher [" + str + "] " + this.fetcherIdentifier;
        this.localFs = rawLocalFileSystem;
        this.localDirAllocator = localDirAllocator;
        this.lockPath = path;
        this.localHostname = str2;
        this.shufflePort = i;
        try {
            if (this.sharedFetchEnabled) {
                this.localFs.mkdirs(this.lockPath);
            }
        } catch (Exception e) {
            LOG.warn("Error initializing local dirs for shared transfer " + e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
    public FetchResult m14callInternal() throws Exception {
        boolean z = this.sharedFetchEnabled && this.localDiskFetchEnabled;
        if (this.srcAttempts.size() == 0) {
            return new FetchResult(this.host, this.port, this.partition, this.srcAttempts);
        }
        for (InputAttemptIdentifier inputAttemptIdentifier : this.srcAttempts) {
            this.pathToAttemptMap.put(inputAttemptIdentifier.getPathComponent(), inputAttemptIdentifier);
            z &= inputAttemptIdentifier.isShared();
        }
        if (z) {
            Preconditions.checkArgument(this.partition == 0, "Shared fetches cannot be done for partitioned input- partition is non-zero (%d)", new Object[]{Integer.valueOf(this.partition)});
        }
        this.remaining = new LinkedList(this.srcAttempts);
        HostFetchResult doSharedFetch = (this.localDiskFetchEnabled && this.host.equals(this.localHostname) && this.port == this.shufflePort) ? setupLocalDiskFetch() : z ? doSharedFetch() : doHttpFetch();
        if (doSharedFetch.failedInputs != null && doSharedFetch.failedInputs.length > 0) {
            if (this.isShutDown.get()) {
                LOG.info("Ignoring failed fetch reports for " + doSharedFetch.failedInputs.length + " inputs since the fetcher has already been stopped");
            } else {
                LOG.warn("copyInputs failed for tasks " + Arrays.toString(doSharedFetch.failedInputs));
                for (InputAttemptIdentifier inputAttemptIdentifier2 : doSharedFetch.failedInputs) {
                    this.fetcherCallback.fetchFailed(this.host, inputAttemptIdentifier2, doSharedFetch.connectFailed);
                }
            }
        }
        shutdown();
        if (doSharedFetch.failedInputs == null && !this.remaining.isEmpty()) {
            if (!z) {
                throw new IOException("server didn't return all expected map outputs: " + this.remaining.size() + " left.");
            }
            LOG.info("Shared fetch failed to return " + this.remaining.size() + " inputs on this try");
        }
        return doSharedFetch.fetchResult;
    }

    private int findInputs() throws IOException {
        int i = 0;
        Iterator<InputAttemptIdentifier> it = this.srcAttempts.iterator();
        while (it.hasNext()) {
            try {
                if (getShuffleInputFileName(it.next().getPathComponent(), ".index") != null) {
                    i++;
                }
            } catch (DiskChecker.DiskErrorException e) {
            }
        }
        return i;
    }

    private FileLock getLock() throws OverlappingFileLockException, InterruptedException, IOException {
        File pathToFile = this.localFs.pathToFile(new Path(this.lockPath, this.host + ".lock"));
        if (!pathToFile.createNewFile() && !pathToFile.exists()) {
            return null;
        }
        FileChannel channel = new RandomAccessFile(pathToFile, "rws").getChannel();
        FileLock tryLock = channel.tryLock(0L, Long.MAX_VALUE, false);
        if (tryLock != null) {
            return tryLock;
        }
        channel.close();
        return null;
    }

    private void releaseLock(FileLock fileLock) throws IOException {
        if (fileLock == null || !fileLock.isValid()) {
            return;
        }
        FileChannel channel = fileLock.channel();
        fileLock.release();
        channel.close();
    }

    /* JADX WARN: Removed duplicated region for block: B:40:0x0160  */
    /* JADX WARN: Removed duplicated region for block: B:42:0x0181  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected org.apache.tez.runtime.library.common.shuffle.Fetcher.HostFetchResult doSharedFetch() throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 390
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.tez.runtime.library.common.shuffle.Fetcher.doSharedFetch():org.apache.tez.runtime.library.common.shuffle.Fetcher$HostFetchResult");
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch() {
        return doHttpFetch(null);
    }

    private HostFetchResult setupConnection(List<InputAttemptIdentifier> list) {
        try {
            this.url = ShuffleUtils.constructInputURL(ShuffleUtils.constructBaseURIForShuffleHandler(this.host, this.port, this.partition, this.appId.toString(), this.httpConnectionParams.isSSLShuffleEnabled()).toString(), list, this.httpConnectionParams.getKeepAlive());
            this.httpConnection = new HttpConnection(this.url, this.httpConnectionParams, this.logIdentifier, this.jobTokenSecretMgr);
            this.httpConnection.connect();
            if (this.isShutDown.get()) {
                shutdownInternal();
                LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
                return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
            }
            try {
                this.input = this.httpConnection.getInputStream();
                this.httpConnection.validate();
                return null;
            } catch (IOException e) {
                if (this.isShutDown.get()) {
                    LOG.info("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage());
                    return null;
                }
                InputAttemptIdentifier inputAttemptIdentifier = list.get(0);
                LOG.warn("Fetch Failure from host while connecting: " + this.host + ", attempt: " + inputAttemptIdentifier + " Informing ShuffleManager: ", e);
                return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), new InputAttemptIdentifier[]{inputAttemptIdentifier}, false);
            }
        } catch (IOException e2) {
            InputAttemptIdentifier[] inputAttemptIdentifierArr = null;
            if (this.isShutDown.get()) {
                LOG.info("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e2.getClass().getName() + ", Message: " + e2.getMessage());
            } else {
                inputAttemptIdentifierArr = (InputAttemptIdentifier[]) this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), inputAttemptIdentifierArr, true);
        }
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch(CachingCallBack cachingCallBack) {
        HostFetchResult hostFetchResult = setupConnection(this.srcAttempts);
        if (hostFetchResult != null) {
            return hostFetchResult;
        }
        if (this.isShutDown.get()) {
            shutdownInternal();
            LOG.info("Detected fetcher has been shutdown after opening stream. Returning");
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
        }
        InputAttemptIdentifier[] inputAttemptIdentifierArr = null;
        while (!this.remaining.isEmpty() && inputAttemptIdentifierArr == null) {
            if (this.isShutDown.get()) {
                shutdownInternal(true);
                LOG.info("Fetcher already shutdown. Aborting queued fetches for " + this.remaining.size() + " inputs");
                return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
            }
            try {
                inputAttemptIdentifierArr = fetchInputs(this.input, cachingCallBack);
            } catch (FetcherReadTimeoutException e) {
                shutdownInternal(true);
                if (this.isShutDown.get()) {
                    LOG.info("Fetcher already shutdown. Aborting reconnection and queued fetches for " + this.remaining.size() + " inputs");
                    return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), null, false);
                }
                if (setupConnection(new LinkedList(this.remaining)) != null) {
                    break;
                }
            }
        }
        if (this.isShutDown.get() && inputAttemptIdentifierArr != null && inputAttemptIdentifierArr.length > 0) {
            LOG.info("Fetcher already shutdown. Not reporting fetch failures for: " + inputAttemptIdentifierArr.length + " failed inputs");
            inputAttemptIdentifierArr = null;
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), inputAttemptIdentifierArr, false);
    }

    @VisibleForTesting
    protected HostFetchResult setupLocalDiskFetch() {
        return doLocalDiskFetch(true);
    }

    @VisibleForTesting
    private HostFetchResult doLocalDiskFetch(boolean z) {
        Iterator<InputAttemptIdentifier> it = this.remaining.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (this.isShutDown.get()) {
                LOG.info("Already shutdown. Skipping fetch for " + this.remaining.size() + " inputs");
                break;
            }
            InputAttemptIdentifier next = it.next();
            long currentTimeMillis = System.currentTimeMillis();
            LocalDiskFetchedInput localDiskFetchedInput = null;
            try {
                TezIndexRecord tezIndexRecord = getTezIndexRecord(next);
                localDiskFetchedInput = new LocalDiskFetchedInput(tezIndexRecord.getStartOffset(), tezIndexRecord.getRawLength(), tezIndexRecord.getPartLength(), next, getShuffleInputFileName(next.getPathComponent(), null), this.conf, new FetchedInputCallback() { // from class: org.apache.tez.runtime.library.common.shuffle.Fetcher.1
                    @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback
                    public void fetchComplete(FetchedInput fetchedInput) {
                    }

                    @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback
                    public void fetchFailed(FetchedInput fetchedInput) {
                    }

                    @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback
                    public void freeResources(FetchedInput fetchedInput) {
                    }
                });
                LOG.info("fetcher about to shuffle output of srcAttempt (direct disk)" + next + " decomp: " + tezIndexRecord.getRawLength() + " len: " + tezIndexRecord.getPartLength() + " to " + localDiskFetchedInput.getType());
                this.fetcherCallback.fetchSucceeded(this.host, next, localDiskFetchedInput, tezIndexRecord.getPartLength(), tezIndexRecord.getRawLength(), System.currentTimeMillis() - currentTimeMillis);
                it.remove();
            } catch (IOException e) {
                cleanupFetchedInput(localDiskFetchedInput);
                if (this.isShutDown.get()) {
                    LOG.info("Already shutdown. Ignoring Local Fetch Failure for " + next + " from host " + this.host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
                    break;
                }
                LOG.warn("Failed to shuffle output of " + next + " from " + this.host + "(local fetch)", e);
            }
        }
        InputAttemptIdentifier[] inputAttemptIdentifierArr = null;
        if (z && this.remaining.size() > 0) {
            if (this.isShutDown.get()) {
                LOG.info("Already shutdown, not reporting fetch failures for: " + this.remaining.size() + " remaining inputs");
            } else {
                inputAttemptIdentifierArr = (InputAttemptIdentifier[]) this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.remaining), inputAttemptIdentifierArr, false);
    }

    @VisibleForTesting
    protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
        return new TezSpillRecord(getShuffleInputFileName(inputAttemptIdentifier.getPathComponent(), ".index"), this.conf).getIndex(this.partition);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final String getMapOutputFile(String str) {
        return "output/" + str + "/file.out";
    }

    @VisibleForTesting
    protected Path getShuffleInputFileName(String str, String str2) throws IOException {
        return this.localDirAllocator.getLocalPathToRead(getMapOutputFile(str) + (str2 != null ? str2 : TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT), this.conf);
    }

    public void shutdown() {
        if (this.isShutDown.getAndSet(true)) {
            return;
        }
        shutdownInternal();
    }

    private void shutdownInternal() {
        shutdownInternal(false);
    }

    private void shutdownInternal(boolean z) {
        synchronized (this.isShutDown) {
            try {
                if (this.httpConnection != null) {
                    this.httpConnection.cleanup(z);
                }
            } catch (IOException e) {
                LOG.info("Exception while shutting down fetcher on " + this.logIdentifier + " : " + e.getMessage());
                if (LOG.isDebugEnabled()) {
                    LOG.debug(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT, e);
                }
            }
        }
    }

    private InputAttemptIdentifier[] fetchInputs(DataInputStream dataInputStream, CachingCallBack cachingCallBack) throws FetcherReadTimeoutException {
        InputAttemptIdentifier inputAttemptIdentifier = null;
        long j = -1;
        long j2 = -1;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                ShuffleHeader shuffleHeader = new ShuffleHeader();
                shuffleHeader.readFields(dataInputStream);
                String mapId = shuffleHeader.getMapId();
                inputAttemptIdentifier = this.pathToAttemptMap.get(mapId);
                j2 = shuffleHeader.getCompressedLength();
                j = shuffleHeader.getUncompressedLength();
                if (!verifySanity(j2, j, shuffleHeader.getPartition(), inputAttemptIdentifier, mapId)) {
                    if (this.isShutDown.get()) {
                        LOG.info("Already shutdown. Ignoring verification failure.");
                        return null;
                    }
                    if (inputAttemptIdentifier == null) {
                        LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
                        inputAttemptIdentifier = getNextRemainingAttempt();
                    }
                    if ($assertionsDisabled || inputAttemptIdentifier != null) {
                        return new InputAttemptIdentifier[]{inputAttemptIdentifier};
                    }
                    throw new AssertionError();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("header: " + inputAttemptIdentifier + ", len: " + j2 + ", decomp len: " + j);
                }
                FetchedInput allocate = (!inputAttemptIdentifier.isShared() || cachingCallBack == null) ? this.inputManager.allocate(j, j2, inputAttemptIdentifier) : this.inputManager.allocateType(FetchedInput.Type.DISK, j, j2, inputAttemptIdentifier);
                LOG.info("fetcher about to shuffle output of srcAttempt " + allocate.getInputAttemptIdentifier() + " decomp: " + j + " len: " + j2 + " to " + allocate.getType());
                if (allocate.getType() == FetchedInput.Type.MEMORY) {
                    ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) allocate).getBytes(), dataInputStream, (int) j, (int) j2, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG, allocate.getInputAttemptIdentifier().toString());
                } else {
                    if (allocate.getType() != FetchedInput.Type.DISK) {
                        throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + allocate);
                    }
                    ShuffleUtils.shuffleToDisk(((DiskFetchedInput) allocate).getOutputStream(), this.host + ":" + this.port, dataInputStream, j2, LOG, allocate.getInputAttemptIdentifier().toString());
                }
                if (inputAttemptIdentifier.isShared() && cachingCallBack != null) {
                    cachingCallBack.cache(this.host, inputAttemptIdentifier, allocate, j2, j);
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                this.retryStartTime = 0L;
                this.fetcherCallback.fetchSucceeded(this.host, inputAttemptIdentifier, allocate, j2, j, currentTimeMillis2 - currentTimeMillis);
                this.remaining.remove(inputAttemptIdentifier);
                return null;
            } catch (IllegalArgumentException e) {
                if (this.isShutDown.get()) {
                    LOG.info("Already shutdown. Ignoring badId error with message: " + e.getMessage());
                    return null;
                }
                LOG.warn("Invalid src id ", e);
                return (InputAttemptIdentifier[]) this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
            }
        } catch (IOException e2) {
            if (this.isShutDown.get()) {
                cleanupFetchedInput(null);
                LOG.info("Already shutdown. Ignoring exception during fetch " + e2.getClass().getName() + ", Message: " + e2.getMessage());
                return null;
            }
            if (shouldRetry(inputAttemptIdentifier, e2)) {
                cleanupFetchedInput(null);
                throw new FetcherReadTimeoutException(e2);
            }
            if (inputAttemptIdentifier == null || 0 == 0) {
                LOG.info("fetcher failed to read map header" + inputAttemptIdentifier + " decomp: " + j + ", " + j2, e2);
                cleanupFetchedInput(null);
                return inputAttemptIdentifier == null ? (InputAttemptIdentifier[]) this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]) : new InputAttemptIdentifier[]{inputAttemptIdentifier};
            }
            LOG.warn("Failed to shuffle output of " + inputAttemptIdentifier + " from " + this.host, e2);
            cleanupFetchedInput(null);
            return new InputAttemptIdentifier[]{inputAttemptIdentifier};
        }
    }

    private void cleanupFetchedInput(FetchedInput fetchedInput) {
        if (fetchedInput != null) {
            try {
                fetchedInput.abort();
            } catch (IOException e) {
                LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
            }
        }
    }

    private boolean shouldRetry(InputAttemptIdentifier inputAttemptIdentifier, IOException iOException) {
        if (!(iOException instanceof SocketTimeoutException)) {
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.retryStartTime == 0) {
            this.retryStartTime = currentTimeMillis;
        }
        if (currentTimeMillis - this.retryStartTime < this.httpConnectionParams.getReadTimeout()) {
            LOG.warn("Shuffle output from " + inputAttemptIdentifier + " failed, retry it.");
            return true;
        }
        LOG.warn("Timeout for copying MapOutput with retry on host " + this.host + "after " + this.httpConnectionParams.getReadTimeout() + "milliseconds.");
        return false;
    }

    private boolean verifySanity(long j, long j2, int i, InputAttemptIdentifier inputAttemptIdentifier, String str) {
        if (j < 0 || j2 < 0) {
            LOG.warn(" invalid lengths in input header -> headerPathComponent: " + str + ", nextRemainingSrcAttemptId: " + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + inputAttemptIdentifier + " len: " + j + ", decomp len: " + j2);
            return false;
        }
        if (i != this.partition) {
            LOG.warn(" data for the wrong reduce -> headerPathComponent: " + str + "nextRemainingSrcAttemptId: " + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + inputAttemptIdentifier + " len: " + j + " decomp len: " + j2 + " for reduce " + i);
            return false;
        }
        if (this.remaining.contains(inputAttemptIdentifier)) {
            return true;
        }
        LOG.warn("Invalid input. Received output for headerPathComponent: " + str + "nextRemainingSrcAttemptId: " + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + inputAttemptIdentifier);
        return false;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.remaining.size() > 0) {
            return this.remaining.iterator().next();
        }
        return null;
    }

    public int hashCode() {
        return this.fetcherIdentifier;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return obj != null && getClass() == obj.getClass() && this.fetcherIdentifier == ((Fetcher) obj).fetcherIdentifier;
    }

    static {
        $assertionsDisabled = !Fetcher.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(Fetcher.class);
        fetcherIdGen = new AtomicInteger(0);
    }
}
