package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.class */
public class FetcherOrderedGrouped extends Thread {
    private static final Logger LOG;
    private final Configuration conf;
    private final boolean localDiskFetchEnabled;
    private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
    private final TezCounter connectionErrs;
    private final TezCounter ioErrs;
    private final TezCounter wrongLengthErrs;
    private final TezCounter badIdErrs;
    private final TezCounter wrongMapErrs;
    private final TezCounter wrongReduceErrs;
    private final MergeManager merger;
    private final ShuffleScheduler scheduler;
    private final ShuffleClientMetrics metrics;
    private final Shuffle shuffle;
    private final int id;
    private final String logIdentifier;
    private final String localShuffleHostPort;
    private static int nextId;
    private final CompressionCodec codec;
    private final JobTokenSecretManager jobTokenSecretManager;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private LinkedList<InputAttemptIdentifier> remaining;
    volatile HttpURLConnection connection;
    volatile DataInputStream input;
    HttpConnection httpConnection;
    HttpConnection.HttpConnectionParams httpConnectionParams;
    private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY;
    static final /* synthetic */ boolean $assertionsDisabled;
    private int currentPartition = -1;

    @VisibleForTesting
    volatile boolean stopped = false;
    private long retryStartTime = 0;
    private Object cleanupLock = new Object();

    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped$ShuffleErrors.class */
    private enum ShuffleErrors {
        IO_ERROR,
        WRONG_LENGTH,
        BAD_ID,
        WRONG_MAP,
        CONNECTION,
        WRONG_REDUCE
    }

    public FetcherOrderedGrouped(HttpConnection.HttpConnectionParams httpConnectionParams, ShuffleScheduler shuffleScheduler, MergeManager mergeManager, ShuffleClientMetrics shuffleClientMetrics, Shuffle shuffle, JobTokenSecretManager jobTokenSecretManager, boolean z, int i, CompressionCodec compressionCodec, InputContext inputContext, Configuration configuration, boolean z2, String str, int i2) throws IOException {
        setDaemon(true);
        this.scheduler = shuffleScheduler;
        this.merger = mergeManager;
        this.metrics = shuffleClientMetrics;
        this.shuffle = shuffle;
        int i3 = nextId + 1;
        nextId = i3;
        this.id = i3;
        this.jobTokenSecretManager = jobTokenSecretManager;
        this.ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString());
        this.wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_LENGTH.toString());
        this.badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.BAD_ID.toString());
        this.wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_MAP.toString());
        this.connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.CONNECTION.toString());
        this.wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_REDUCE.toString());
        this.ifileReadAhead = z;
        this.ifileReadAheadLength = i;
        this.httpConnectionParams = httpConnectionParams;
        if (compressionCodec != null) {
            this.codec = compressionCodec;
        } else {
            this.codec = null;
        }
        this.conf = configuration;
        this.localShuffleHostPort = str + ":" + String.valueOf(i2);
        this.localDiskFetchEnabled = z2;
        this.logIdentifier = "fetcher [" + TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + "] #" + this.id;
        setName(this.logIdentifier);
        setDaemon(true);
    }

    @VisibleForTesting
    protected void fetchNext() throws InterruptedException, IOException {
        try {
            this.merger.waitForInMemoryMerge();
            this.merger.waitForShuffleToMergeMemory();
            MapHost host = this.scheduler.getHost();
            this.metrics.threadBusy();
            String hostIdentifier = host.getHostIdentifier();
            if (this.localDiskFetchEnabled && hostIdentifier.equals(this.localShuffleHostPort)) {
                setupLocalDiskFetch(host);
            } else {
                copyFromHost(host);
            }
            cleanupCurrentConnection(false);
            if (host != null) {
                this.scheduler.freeHost(host);
                this.metrics.threadFree();
            }
        } catch (Throwable th) {
            cleanupCurrentConnection(false);
            if (0 != 0) {
                this.scheduler.freeHost(null);
                this.metrics.threadFree();
            }
            throw th;
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.stopped && !Thread.currentThread().isInterrupted()) {
            try {
                this.remaining = null;
                fetchNext();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return;
            } catch (Throwable th) {
                this.shuffle.reportException(th);
                return;
            }
        }
    }

    public void shutDown() throws InterruptedException {
        this.stopped = true;
        interrupt();
        cleanupCurrentConnection(true);
        try {
            join(5000L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.warn("Got interrupt while joining " + getName());
        }
    }

    private void cleanupCurrentConnection(boolean z) {
        synchronized (this.cleanupLock) {
            try {
                if (this.httpConnection != null) {
                    this.httpConnection.cleanup(z);
                }
            } catch (IOException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Exception while shutting down fetcher " + this.logIdentifier, e);
                } else {
                    LOG.info("Exception while shutting down fetcher " + this.logIdentifier + ": " + e.getMessage());
                }
            }
        }
    }

    @VisibleForTesting
    protected void copyFromHost(MapHost mapHost) throws IOException {
        this.retryStartTime = 0L;
        List<InputAttemptIdentifier> mapsForHost = this.scheduler.getMapsForHost(mapHost);
        this.currentPartition = mapHost.getPartitionId();
        if (mapsForHost.size() == 0) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Fetcher " + this.id + " going to fetch from " + mapHost + " for: " + mapsForHost + ", partitionId: " + this.currentPartition);
        }
        this.remaining = new LinkedList<>(mapsForHost);
        if (!setupConnection(mapHost, mapsForHost)) {
            if (this.stopped) {
                cleanupCurrentConnection(true);
            }
            putBackRemainingMapOutputs(mapHost);
            return;
        }
        InputAttemptIdentifier[] inputAttemptIdentifierArr = null;
        while (true) {
            try {
                if (this.remaining.isEmpty() || inputAttemptIdentifierArr != null) {
                    break;
                }
                try {
                    inputAttemptIdentifierArr = copyMapOutput(mapHost, this.input);
                } catch (FetcherReadTimeoutException e) {
                    cleanupCurrentConnection(true);
                    if (this.stopped) {
                        LOG.info("Not re-establishing connection since Fetcher has been stopped");
                        putBackRemainingMapOutputs(mapHost);
                        return;
                    } else if (!setupConnection(mapHost, new LinkedList(this.remaining))) {
                        if (this.stopped) {
                            cleanupCurrentConnection(true);
                            LOG.info("Not reporting connection re-establishment failure since fetcher is stopped");
                            putBackRemainingMapOutputs(mapHost);
                            return;
                        }
                        inputAttemptIdentifierArr = new InputAttemptIdentifier[]{getNextRemainingAttempt()};
                    }
                }
            } catch (Throwable th) {
                putBackRemainingMapOutputs(mapHost);
                throw th;
            }
        }
        if (inputAttemptIdentifierArr != null && inputAttemptIdentifierArr.length > 0) {
            if (this.stopped) {
                LOG.info("Ignoring copyMapOutput failures for tasks: " + Arrays.toString(inputAttemptIdentifierArr) + " since Fetcher has been stopped");
            } else {
                LOG.warn("copyMapOutput failed for tasks " + Arrays.toString(inputAttemptIdentifierArr));
                for (InputAttemptIdentifier inputAttemptIdentifier : inputAttemptIdentifierArr) {
                    this.scheduler.copyFailed(inputAttemptIdentifier, mapHost, true, false);
                }
            }
        }
        cleanupCurrentConnection(false);
        if (inputAttemptIdentifierArr == null && !this.remaining.isEmpty()) {
            throw new IOException("server didn't return all expected map outputs: " + this.remaining.size() + " left.");
        }
        putBackRemainingMapOutputs(mapHost);
    }

    @VisibleForTesting
    boolean setupConnection(MapHost mapHost, List<InputAttemptIdentifier> list) throws IOException {
        try {
            this.httpConnection = new HttpConnection(ShuffleUtils.constructInputURL(mapHost.getBaseUrl(), list, this.httpConnectionParams.getKeepAlive()), this.httpConnectionParams, this.logIdentifier, this.jobTokenSecretManager);
            this.httpConnection.connect();
            if (this.stopped) {
                LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
                return false;
            }
            this.input = this.httpConnection.getInputStream();
            this.httpConnection.validate();
            return true;
        } catch (IOException e) {
            if (this.stopped) {
                LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
                return false;
            }
            this.ioErrs.increment(1L);
            if (0 == 0) {
                LOG.warn("Failed to connect to " + mapHost + " with " + this.remaining.size() + " inputs", e);
                this.connectionErrs.increment(1L);
            } else {
                LOG.warn("Failed to verify reply after connecting to " + mapHost + " with " + this.remaining.size() + " inputs pending", e);
            }
            Iterator<InputAttemptIdentifier> it = this.remaining.iterator();
            while (it.hasNext()) {
                this.scheduler.copyFailed(it.next(), mapHost, false, 0 == 0);
            }
            return false;
        }
    }

    @VisibleForTesting
    protected void putBackRemainingMapOutputs(MapHost mapHost) {
        boolean z = true;
        InputAttemptIdentifier inputAttemptIdentifier = null;
        Iterator<InputAttemptIdentifier> it = this.remaining.iterator();
        while (it.hasNext()) {
            InputAttemptIdentifier next = it.next();
            if (z) {
                inputAttemptIdentifier = next;
                z = false;
            } else {
                this.scheduler.putBackKnownMapOutput(mapHost, next);
            }
        }
        if (inputAttemptIdentifier != null) {
            this.scheduler.putBackKnownMapOutput(mapHost, inputAttemptIdentifier);
        }
    }

    protected InputAttemptIdentifier[] copyMapOutput(MapHost mapHost, DataInputStream dataInputStream) throws FetcherReadTimeoutException {
        MapOutput mapOutput = null;
        InputAttemptIdentifier inputAttemptIdentifier = null;
        long j = -1;
        long j2 = -1;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                ShuffleHeader shuffleHeader = new ShuffleHeader();
                shuffleHeader.readFields(dataInputStream);
                if (!shuffleHeader.mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
                    if (this.stopped) {
                        LOG.info("Already shutdown. Ignoring invalid map id error");
                        return EMPTY_ATTEMPT_ID_ARRAY;
                    }
                    this.badIdErrs.increment(1L);
                    LOG.warn("Invalid map id: " + shuffleHeader.mapId + ", expected to start with " + InputAttemptIdentifier.PATH_PREFIX + ", partition: " + shuffleHeader.forReduce);
                    return new InputAttemptIdentifier[]{getNextRemainingAttempt()};
                }
                inputAttemptIdentifier = this.scheduler.getIdentifierForFetchedOutput(shuffleHeader.mapId, shuffleHeader.forReduce);
                j2 = shuffleHeader.compressedLength;
                j = shuffleHeader.uncompressedLength;
                if (!verifySanity(j2, j, shuffleHeader.forReduce, this.remaining, inputAttemptIdentifier)) {
                    if (this.stopped) {
                        LOG.info("Already stopped. Ignoring verification failure.");
                        return EMPTY_ATTEMPT_ID_ARRAY;
                    }
                    if (inputAttemptIdentifier == null) {
                        LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
                        inputAttemptIdentifier = getNextRemainingAttempt();
                    }
                    if ($assertionsDisabled || inputAttemptIdentifier != null) {
                        return new InputAttemptIdentifier[]{inputAttemptIdentifier};
                    }
                    throw new AssertionError();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("header: " + inputAttemptIdentifier + ", len: " + j2 + ", decomp len: " + j);
                }
                try {
                    mapOutput = this.merger.reserve(inputAttemptIdentifier, j, j2, this.id);
                    if (mapOutput.getType() == MapOutput.Type.WAIT) {
                        LOG.info("fetcher#" + this.id + " - MergerManager returned Status.WAIT ...");
                        return EMPTY_ATTEMPT_ID_ARRAY;
                    }
                    LOG.info("fetcher#" + this.id + " about to shuffle output of map " + mapOutput.getAttemptIdentifier() + " decomp: " + j + " len: " + j2 + " to " + mapOutput.getType());
                    if (mapOutput.getType() == MapOutput.Type.MEMORY) {
                        ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), dataInputStream, (int) j, (int) j2, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
                    } else {
                        if (mapOutput.getType() != MapOutput.Type.DISK) {
                            throw new IOException("Unknown mapOutput type while fetching shuffle data:" + mapOutput.getType());
                        }
                        ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), mapHost.getHostIdentifier(), dataInputStream, j2, LOG, mapOutput.getAttemptIdentifier().toString());
                    }
                    long currentTimeMillis2 = System.currentTimeMillis();
                    this.retryStartTime = 0L;
                    this.scheduler.copySucceeded(inputAttemptIdentifier, mapHost, j2, j, currentTimeMillis2 - currentTimeMillis, mapOutput);
                    this.remaining.remove(inputAttemptIdentifier);
                    this.metrics.successFetch();
                    return null;
                } catch (IOException e) {
                    if (this.stopped) {
                        LOG.info("Already stopped. Ignoring error from merger.reserve");
                    } else {
                        this.ioErrs.increment(1L);
                        this.scheduler.reportLocalError(e);
                    }
                    return EMPTY_ATTEMPT_ID_ARRAY;
                }
            } catch (IllegalArgumentException e2) {
                if (this.stopped) {
                    LOG.info("Already shutdown. Ignoring invalid map id error. Exception: " + e2.getClass().getName() + ", Message: " + e2.getMessage());
                    return EMPTY_ATTEMPT_ID_ARRAY;
                }
                this.badIdErrs.increment(1L);
                LOG.warn("Invalid map id ", e2);
                return new InputAttemptIdentifier[]{getNextRemainingAttempt()};
            }
        } catch (IOException e3) {
            if (this.stopped) {
                LOG.info("Not reporting fetch failure for exception during data copy: [" + e3.getClass().getName() + ", " + e3.getMessage() + "]");
                cleanupCurrentConnection(true);
                if (mapOutput != null) {
                    mapOutput.abort();
                }
                return EMPTY_ATTEMPT_ID_ARRAY;
            }
            if (shouldRetry(mapHost, e3)) {
                if (mapOutput != null) {
                    mapOutput.abort();
                }
                throw new FetcherReadTimeoutException(e3);
            }
            this.ioErrs.increment(1L);
            if (inputAttemptIdentifier == null || mapOutput == null) {
                LOG.info("fetcher#" + this.id + " failed to read map header" + inputAttemptIdentifier + " decomp: " + j + ", " + j2, e3);
                return inputAttemptIdentifier == null ? (InputAttemptIdentifier[]) this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]) : new InputAttemptIdentifier[]{inputAttemptIdentifier};
            }
            LOG.warn("Failed to shuffle output of " + inputAttemptIdentifier + " from " + mapHost.getHostIdentifier(), e3);
            mapOutput.abort();
            this.metrics.failedFetch();
            return new InputAttemptIdentifier[]{inputAttemptIdentifier};
        }
    }

    private boolean shouldRetry(MapHost mapHost, IOException iOException) {
        if (!(iOException instanceof SocketTimeoutException)) {
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.retryStartTime == 0) {
            this.retryStartTime = currentTimeMillis;
        }
        if (currentTimeMillis - this.retryStartTime < this.httpConnectionParams.getReadTimeout()) {
            LOG.warn("Shuffle output from " + mapHost.getHostIdentifier() + " failed, retry it.");
            return true;
        }
        LOG.warn("Timeout for copying MapOutput with retry on host " + mapHost + "after " + this.httpConnectionParams.getReadTimeout() + "milliseconds.");
        return false;
    }

    private boolean verifySanity(long j, long j2, int i, List<InputAttemptIdentifier> list, InputAttemptIdentifier inputAttemptIdentifier) {
        if (j < 0 || j2 < 0) {
            this.wrongLengthErrs.increment(1L);
            LOG.warn(getName() + " invalid lengths in map output header: id: " + inputAttemptIdentifier + " len: " + j + ", decomp len: " + j2);
            return false;
        }
        if (i != this.currentPartition) {
            this.wrongReduceErrs.increment(1L);
            LOG.warn(getName() + " data for the wrong partition map: " + inputAttemptIdentifier + " len: " + j + " decomp len: " + j2 + " for partition " + i + ", expected partition: " + this.currentPartition);
            return false;
        }
        if (list.contains(inputAttemptIdentifier)) {
            return true;
        }
        this.wrongMapErrs.increment(1L);
        LOG.warn("Invalid map-output! Received output for " + inputAttemptIdentifier);
        return false;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.remaining.size() > 0) {
            return this.remaining.iterator().next();
        }
        return null;
    }

    @VisibleForTesting
    protected void setupLocalDiskFetch(MapHost mapHost) throws InterruptedException {
        List<InputAttemptIdentifier> mapsForHost = this.scheduler.getMapsForHost(mapHost);
        this.currentPartition = mapHost.getPartitionId();
        if (mapsForHost.size() == 0) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Fetcher " + this.id + " going to fetch (local disk) from " + mapHost + " for: " + mapsForHost + ", partitionId: " + this.currentPartition);
        }
        this.remaining = new LinkedList<>(mapsForHost);
        try {
            Iterator<InputAttemptIdentifier> it = this.remaining.iterator();
            while (it.hasNext()) {
                if (this.stopped) {
                    return;
                }
                InputAttemptIdentifier next = it.next();
                MapOutput mapOutput = null;
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    Path shuffleInputFileName = getShuffleInputFileName(next.getPathComponent(), null);
                    TezIndexRecord indexRecord = getIndexRecord(next.getPathComponent(), this.currentPartition);
                    mapOutput = getMapOutputForDirectDiskFetch(next, shuffleInputFileName, indexRecord);
                    this.scheduler.copySucceeded(next, mapHost, indexRecord.getPartLength(), indexRecord.getRawLength(), System.currentTimeMillis() - currentTimeMillis, mapOutput);
                    it.remove();
                    this.metrics.successFetch();
                } catch (IOException e) {
                    if (mapOutput != null) {
                        mapOutput.abort();
                    }
                    if (this.stopped) {
                        LOG.info("Ignoring fetch error during local disk copy since fetcher has already been stopped");
                        putBackRemainingMapOutputs(mapHost);
                        return;
                    } else {
                        this.metrics.failedFetch();
                        this.ioErrs.increment(1L);
                        this.scheduler.copyFailed(next, mapHost, true, false);
                        LOG.warn("Failed to read local disk output of " + next + " from " + mapHost.getHostIdentifier(), e);
                    }
                }
            }
            putBackRemainingMapOutputs(mapHost);
        } finally {
            putBackRemainingMapOutputs(mapHost);
        }
    }

    @VisibleForTesting
    protected Path getShuffleInputFileName(String str, String str2) throws IOException {
        return new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS).getLocalPathToRead(("output/" + str + "/file.out" + (str2 != null ? str2 : TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT)).toString(), this.conf);
    }

    @VisibleForTesting
    protected TezIndexRecord getIndexRecord(String str, int i) throws IOException {
        return new TezSpillRecord(getShuffleInputFileName(str, ".index"), this.conf).getIndex(i);
    }

    @VisibleForTesting
    protected MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier inputAttemptIdentifier, Path path, TezIndexRecord tezIndexRecord) throws IOException {
        return MapOutput.createLocalDiskMapOutput(inputAttemptIdentifier, this.merger, path, tezIndexRecord.getStartOffset(), tezIndexRecord.getPartLength(), true);
    }

    static {
        $assertionsDisabled = !FetcherOrderedGrouped.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(FetcherOrderedGrouped.class);
        nextId = 0;
        EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];
    }
}
