/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleClientMetrics;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FetcherOrderedGrouped
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(FetcherOrderedGrouped.class);
    private final Configuration conf;
    private final boolean localDiskFetchEnabled;
    private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
    private final TezCounter connectionErrs;
    private final TezCounter ioErrs;
    private final TezCounter wrongLengthErrs;
    private final TezCounter badIdErrs;
    private final TezCounter wrongMapErrs;
    private final TezCounter wrongReduceErrs;
    private final MergeManager merger;
    private final ShuffleScheduler scheduler;
    private final ShuffleClientMetrics metrics;
    private final Shuffle shuffle;
    private final int id;
    private final String logIdentifier;
    private final String localShuffleHostPort;
    private static int nextId = 0;
    private int currentPartition = -1;
    private final CompressionCodec codec;
    private final JobTokenSecretManager jobTokenSecretManager;
    @VisibleForTesting
    volatile boolean stopped = false;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private LinkedList<InputAttemptIdentifier> remaining;
    volatile HttpURLConnection connection;
    volatile DataInputStream input;
    volatile MapHost assignedHost = null;
    HttpConnection httpConnection;
    HttpConnection.HttpConnectionParams httpConnectionParams;
    private long retryStartTime = 0L;
    private Object cleanupLock = new Object();
    private static InputAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new InputAttemptIdentifier[0];

    public FetcherOrderedGrouped(HttpConnection.HttpConnectionParams httpConnectionParams, ShuffleScheduler scheduler, MergeManager merger, ShuffleClientMetrics metrics, Shuffle shuffle, JobTokenSecretManager jobTokenSecretMgr, boolean ifileReadAhead, int ifileReadAheadLength, CompressionCodec codec, InputContext inputContext, Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort) throws IOException {
        this.setDaemon(true);
        this.scheduler = scheduler;
        this.merger = merger;
        this.metrics = metrics;
        this.shuffle = shuffle;
        this.id = ++nextId;
        this.jobTokenSecretManager = jobTokenSecretMgr;
        this.ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString());
        this.wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_LENGTH.toString());
        this.badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.BAD_ID.toString());
        this.wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_MAP.toString());
        this.connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.CONNECTION.toString());
        this.wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_REDUCE.toString());
        this.ifileReadAhead = ifileReadAhead;
        this.ifileReadAheadLength = ifileReadAheadLength;
        this.httpConnectionParams = httpConnectionParams;
        this.codec = codec != null ? codec : null;
        this.conf = conf;
        this.localShuffleHostPort = localHostname + ":" + String.valueOf(shufflePort);
        this.localDiskFetchEnabled = localDiskFetchEnabled;
        this.logIdentifier = "fetcher {" + TezUtilsInternal.cleanVertexName((String)inputContext.getSourceVertexName()) + "} #" + this.id;
        this.setName(this.logIdentifier);
        this.setDaemon(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected void fetchNext() throws InterruptedException, IOException {
        this.assignedHost = null;
        try {
            this.merger.waitForInMemoryMerge();
            this.merger.waitForShuffleToMergeMemory();
            this.assignedHost = this.scheduler.getHost();
            this.metrics.threadBusy();
            String hostPort = this.assignedHost.getHostIdentifier();
            if (this.localDiskFetchEnabled && hostPort.equals(this.localShuffleHostPort)) {
                this.setupLocalDiskFetch(this.assignedHost);
            } else {
                this.copyFromHost(this.assignedHost);
            }
        }
        finally {
            this.cleanupCurrentConnection(false);
            if (this.assignedHost != null) {
                this.scheduler.freeHost(this.assignedHost);
                this.metrics.threadFree();
            }
        }
    }

    @Override
    public void run() {
        try {
            while (!this.stopped && !Thread.currentThread().isInterrupted()) {
                this.remaining = null;
                this.fetchNext();
            }
        }
        catch (InterruptedException ie) {
            return;
        }
        catch (Throwable t) {
            this.shuffle.reportException(t);
        }
    }

    public void shutDown() throws InterruptedException {
        this.stopped = true;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Fetcher stopped for host " + this.assignedHost);
        }
        this.interrupt();
        this.cleanupCurrentConnection(true);
        try {
            this.join(5000L);
        }
        catch (InterruptedException ie) {
            LOG.warn("Got interrupt while joining " + this.getName(), (Throwable)ie);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanupCurrentConnection(boolean disconnect) {
        Object object = this.cleanupLock;
        synchronized (object) {
            try {
                if (this.httpConnection != null) {
                    this.httpConnection.cleanup(disconnect);
                }
            }
            catch (IOException e) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Exception while shutting down fetcher " + this.logIdentifier, (Throwable)e);
                }
                LOG.info("Exception while shutting down fetcher " + this.logIdentifier + ": " + e.getMessage());
            }
        }
    }

    /*
     * Exception decompiling
     */
    @VisibleForTesting
    protected void copyFromHost(MapHost host) throws IOException {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 3 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    @VisibleForTesting
    boolean setupConnection(MapHost host, List<InputAttemptIdentifier> attempts) throws IOException {
        boolean connectSucceeded = false;
        try {
            URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts, this.httpConnectionParams.getKeepAlive());
            this.httpConnection = new HttpConnection(url, this.httpConnectionParams, this.logIdentifier, this.jobTokenSecretManager);
            connectSucceeded = this.httpConnection.connect();
            if (this.stopped) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
                }
                return false;
            }
            this.input = this.httpConnection.getInputStream();
            this.httpConnection.validate();
            return true;
        }
        catch (IOException ie) {
            if (this.stopped) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Not reporting fetch failure, since an Exception was caught after shutdown");
                }
                return false;
            }
            this.ioErrs.increment(1L);
            if (!connectSucceeded) {
                LOG.warn("Failed to connect to " + host + " with " + this.remaining.size() + " inputs", (Throwable)ie);
                this.connectionErrs.increment(1L);
            } else {
                LOG.warn("Failed to verify reply after connecting to " + host + " with " + this.remaining.size() + " inputs pending", (Throwable)ie);
            }
            for (InputAttemptIdentifier left : this.remaining) {
                this.scheduler.copyFailed(left, host, connectSucceeded, !connectSucceeded, false);
            }
            return false;
        }
    }

    @VisibleForTesting
    protected void putBackRemainingMapOutputs(MapHost host) {
        boolean isFirst = true;
        InputAttemptIdentifier first = null;
        for (InputAttemptIdentifier left : this.remaining) {
            if (isFirst) {
                first = left;
                isFirst = false;
                continue;
            }
            this.scheduler.putBackKnownMapOutput(host, left);
        }
        if (first != null) {
            this.scheduler.putBackKnownMapOutput(host, first);
        }
    }

    protected InputAttemptIdentifier[] copyMapOutput(MapHost host, DataInputStream input) throws FetcherReadTimeoutException {
        MapOutput mapOutput = null;
        InputAttemptIdentifier srcAttemptId = null;
        long decompressedLength = -1L;
        long compressedLength = -1L;
        try {
            long startTime = System.currentTimeMillis();
            int forReduce = -1;
            try {
                ShuffleHeader header = new ShuffleHeader();
                header.readFields(input);
                if (!header.mapId.startsWith("attempt")) {
                    if (!this.stopped) {
                        this.badIdErrs.increment(1L);
                        LOG.warn("Invalid map id: " + header.mapId + ", expected to start with " + "attempt" + ", partition: " + header.forReduce);
                        return new InputAttemptIdentifier[]{this.getNextRemainingAttempt()};
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Already shutdown. Ignoring invalid map id error");
                    }
                    return EMPTY_ATTEMPT_ID_ARRAY;
                }
                srcAttemptId = this.scheduler.getIdentifierForFetchedOutput(header.mapId, header.forReduce);
                compressedLength = header.compressedLength;
                decompressedLength = header.uncompressedLength;
                forReduce = header.forReduce;
            }
            catch (IllegalArgumentException e) {
                if (!this.stopped) {
                    this.badIdErrs.increment(1L);
                    LOG.warn("Invalid map id ", (Throwable)e);
                    return new InputAttemptIdentifier[]{this.getNextRemainingAttempt()};
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Already shutdown. Ignoring invalid map id error. Exception: " + e.getClass().getName() + ", Message: " + e.getMessage());
                }
                return EMPTY_ATTEMPT_ID_ARRAY;
            }
            if (!this.verifySanity(compressedLength, decompressedLength, forReduce, this.remaining, srcAttemptId)) {
                if (!this.stopped) {
                    if (srcAttemptId == null) {
                        LOG.warn("Was expecting " + this.getNextRemainingAttempt() + " but got null");
                        srcAttemptId = this.getNextRemainingAttempt();
                    }
                    assert (srcAttemptId != null);
                    return new InputAttemptIdentifier[]{srcAttemptId};
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Already stopped. Ignoring verification failure.");
                }
                return EMPTY_ATTEMPT_ID_ARRAY;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + ", decomp len: " + decompressedLength);
            }
            try {
                mapOutput = this.merger.reserve(srcAttemptId, decompressedLength, compressedLength, this.id);
            }
            catch (IOException e) {
                if (!this.stopped) {
                    this.ioErrs.increment(1L);
                    this.scheduler.reportLocalError(e);
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug("Already stopped. Ignoring error from merger.reserve");
                }
                return EMPTY_ATTEMPT_ID_ARRAY;
            }
            if (mapOutput.getType() == MapOutput.Type.WAIT) {
                LOG.info("fetcher#" + this.id + " - MergerManager returned Status.WAIT ...");
                return EMPTY_ATTEMPT_ID_ARRAY;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("fetcher#" + this.id + " about to shuffle output of map " + mapOutput.getAttemptIdentifier() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + (Object)((Object)mapOutput.getType()));
            }
            if (mapOutput.getType() == MapOutput.Type.MEMORY) {
                ShuffleUtils.shuffleToMemory(mapOutput.getMemory(), input, (int)decompressedLength, (int)compressedLength, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG, mapOutput.getAttemptIdentifier().toString());
            } else if (mapOutput.getType() == MapOutput.Type.DISK) {
                ShuffleUtils.shuffleToDisk(mapOutput.getDisk(), host.getHostIdentifier(), input, compressedLength, decompressedLength, LOG, mapOutput.getAttemptIdentifier().toString());
            } else {
                throw new IOException("Unknown mapOutput type while fetching shuffle data:" + (Object)((Object)mapOutput.getType()));
            }
            long endTime = System.currentTimeMillis();
            this.retryStartTime = 0L;
            this.scheduler.copySucceeded(srcAttemptId, host, compressedLength, decompressedLength, endTime - startTime, mapOutput, false);
            this.remaining.remove(srcAttemptId);
            this.metrics.successFetch();
            return null;
        }
        catch (IOException ioe) {
            if (this.stopped) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Not reporting fetch failure for exception during data copy: [" + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
                }
                this.cleanupCurrentConnection(true);
                if (mapOutput != null) {
                    mapOutput.abort();
                }
                return EMPTY_ATTEMPT_ID_ARRAY;
            }
            if (this.shouldRetry(host, ioe)) {
                if (mapOutput != null) {
                    mapOutput.abort();
                }
                throw new FetcherReadTimeoutException(ioe);
            }
            this.ioErrs.increment(1L);
            if (srcAttemptId == null || mapOutput == null) {
                LOG.info("fetcher#" + this.id + " failed to read map header" + srcAttemptId + " decomp: " + decompressedLength + ", " + compressedLength, (Throwable)ioe);
                if (srcAttemptId == null) {
                    return this.remaining.toArray(new InputAttemptIdentifier[this.remaining.size()]);
                }
                return new InputAttemptIdentifier[]{srcAttemptId};
            }
            LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host.getHostIdentifier(), (Throwable)ioe);
            mapOutput.abort();
            this.metrics.failedFetch();
            return new InputAttemptIdentifier[]{srcAttemptId};
        }
    }

    private boolean shouldRetry(MapHost host, IOException ioe) {
        if (!(ioe instanceof SocketTimeoutException)) {
            return false;
        }
        long currentTime = System.currentTimeMillis();
        if (this.retryStartTime == 0L) {
            this.retryStartTime = currentTime;
        }
        if (currentTime - this.retryStartTime < (long)this.httpConnectionParams.getReadTimeout()) {
            LOG.warn("Shuffle output from " + host.getHostIdentifier() + " failed, retry it.");
            return true;
        }
        LOG.warn("Timeout for copying MapOutput with retry on host " + host + "after " + this.httpConnectionParams.getReadTimeout() + "milliseconds.");
        return false;
    }

    private boolean verifySanity(long compressedLength, long decompressedLength, int forReduce, List<InputAttemptIdentifier> remaining, InputAttemptIdentifier srcAttemptId) {
        if (compressedLength < 0L || decompressedLength < 0L) {
            this.wrongLengthErrs.increment(1L);
            LOG.warn(this.getName() + " invalid lengths in map output header: id: " + srcAttemptId + " len: " + compressedLength + ", decomp len: " + decompressedLength);
            return false;
        }
        if (forReduce != this.currentPartition) {
            this.wrongReduceErrs.increment(1L);
            LOG.warn(this.getName() + " data for the wrong partition map: " + srcAttemptId + " len: " + compressedLength + " decomp len: " + decompressedLength + " for partition " + forReduce + ", expected partition: " + this.currentPartition);
            return false;
        }
        if (!remaining.contains(srcAttemptId)) {
            this.wrongMapErrs.increment(1L);
            LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
            return false;
        }
        return true;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.remaining.size() > 0) {
            return (InputAttemptIdentifier)this.remaining.iterator().next();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    protected void setupLocalDiskFetch(MapHost host) throws InterruptedException {
        List<InputAttemptIdentifier> srcAttempts = this.scheduler.getMapsForHost(host);
        this.currentPartition = host.getPartitionId();
        if (srcAttempts.size() == 0) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Fetcher " + this.id + " going to fetch (local disk) from " + host + " for: " + srcAttempts + ", partitionId: " + this.currentPartition);
        }
        this.remaining = new LinkedList<InputAttemptIdentifier>(srcAttempts);
        try {
            Iterator iter = this.remaining.iterator();
            while (iter.hasNext()) {
                if (this.stopped) {
                    return;
                }
                InputAttemptIdentifier srcAttemptId = (InputAttemptIdentifier)iter.next();
                MapOutput mapOutput = null;
                try {
                    long startTime = System.currentTimeMillis();
                    Path filename = this.getShuffleInputFileName(srcAttemptId.getPathComponent(), null);
                    TezIndexRecord indexRecord = this.getIndexRecord(srcAttemptId.getPathComponent(), this.currentPartition);
                    mapOutput = this.getMapOutputForDirectDiskFetch(srcAttemptId, filename, indexRecord);
                    long endTime = System.currentTimeMillis();
                    this.scheduler.copySucceeded(srcAttemptId, host, indexRecord.getPartLength(), indexRecord.getRawLength(), endTime - startTime, mapOutput, true);
                    iter.remove();
                    this.metrics.successFetch();
                }
                catch (IOException e) {
                    if (mapOutput != null) {
                        mapOutput.abort();
                    }
                    if (!this.stopped) {
                        this.metrics.failedFetch();
                        this.ioErrs.increment(1L);
                        this.scheduler.copyFailed(srcAttemptId, host, true, false, true);
                        LOG.warn("Failed to read local disk output of " + srcAttemptId + " from " + host.getHostIdentifier(), (Throwable)e);
                        continue;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Ignoring fetch error during local disk copy since fetcher has already been stopped");
                    }
                    this.putBackRemainingMapOutputs(host);
                    return;
                }
            }
        }
        finally {
            this.putBackRemainingMapOutputs(host);
        }
    }

    @VisibleForTesting
    protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        suffix = suffix != null ? suffix : "";
        String pathFromLocalDir = "output/" + pathComponent + "/" + "file.out" + suffix;
        return localDirAllocator.getLocalPathToRead(pathFromLocalDir.toString(), this.conf);
    }

    @VisibleForTesting
    protected TezIndexRecord getIndexRecord(String pathComponent, int partitionId) throws IOException {
        Path indexFile = this.getShuffleInputFileName(pathComponent, ".index");
        TezSpillRecord spillRecord = new TezSpillRecord(indexFile, this.conf);
        return spillRecord.getIndex(partitionId);
    }

    @VisibleForTesting
    protected MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier srcAttemptId, Path filename, TezIndexRecord indexRecord) throws IOException {
        return MapOutput.createLocalDiskMapOutput(srcAttemptId, this.merger, filename, indexRecord.getStartOffset(), indexRecord.getPartLength(), true);
    }

    private static enum ShuffleErrors {
        IO_ERROR,
        WRONG_LENGTH,
        BAD_ID,
        WRONG_MAP,
        CONNECTION,
        WRONG_REDUCE;

    }
}

