package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.class */
public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerOrderedGrouped.class);
    private final ShuffleScheduler scheduler;
    private final InputContext inputContext;
    private final boolean sslShuffle;
    private int maxMapRuntime = 0;
    private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
    private final AtomicInteger numDmeEvents = new AtomicInteger(0);
    private final AtomicInteger numObsoletionEvents = new AtomicInteger(0);
    private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);

    public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext, ShuffleScheduler shuffleScheduler, boolean z) {
        this.inputContext = inputContext;
        this.scheduler = shuffleScheduler;
        this.sslShuffle = z;
    }

    @Override // org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler
    public void handleEvents(List<Event> list) throws IOException {
        Iterator<Event> it = list.iterator();
        while (it.hasNext()) {
            handleEvent(it.next());
        }
    }

    @Override // org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler
    public void logProgress(boolean z) {
        LOG.info(this.inputContext.getSourceVertexName() + ": numDmeEventsSeen=" + this.numDmeEvents.get() + ", numDmeEventsSeenWithNoData=" + this.numDmeEventsNoData.get() + ", numObsoletionEventsSeen=" + this.numObsoletionEvents.get() + (z ? ", updateOnClose" : TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT));
    }

    private void handleEvent(Event event) throws IOException {
        if (event instanceof DataMovementEvent) {
            this.numDmeEvents.incrementAndGet();
            processDataMovementEvent((DataMovementEvent) event);
            this.scheduler.updateEventReceivedTime();
        } else if (event instanceof InputFailedEvent) {
            this.numObsoletionEvents.incrementAndGet();
            processTaskFailedEvent((InputFailedEvent) event);
        }
        if (this.numDmeEvents.get() + this.numObsoletionEvents.get() > this.nextToLogEventCount.get()) {
            logProgress(false);
            this.nextToLogEventCount.addAndGet(50);
        }
    }

    private void processDataMovementEvent(DataMovementEvent dataMovementEvent) throws IOException {
        try {
            ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dataMovementEvent.getUserPayload()));
            int sourceIndex = dataMovementEvent.getSourceIndex();
            if (LOG.isDebugEnabled()) {
                LOG.debug("DME srcIdx: " + sourceIndex + ", targetIdx: " + dataMovementEvent.getTargetIndex() + ", attemptNum: " + dataMovementEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(parseFrom));
            }
            int runDuration = parseFrom.getRunDuration();
            if (runDuration > this.maxMapRuntime) {
                this.maxMapRuntime = runDuration;
                this.scheduler.informMaxMapRunTime(this.maxMapRuntime);
            }
            if (parseFrom.hasEmptyPartitions()) {
                try {
                    if (TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(parseFrom.getEmptyPartitions())).get(sourceIndex)) {
                        InputAttemptIdentifier constructInputAttemptIdentifier = constructInputAttemptIdentifier(dataMovementEvent, parseFrom);
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Source partition: " + sourceIndex + " did not generate any data. SrcAttempt: [" + constructInputAttemptIdentifier + "]. Not fetching.");
                        }
                        this.numDmeEventsNoData.incrementAndGet();
                        this.scheduler.copySucceeded(constructInputAttemptIdentifier, null, 0L, 0L, 0L, null);
                        return;
                    }
                } catch (IOException e) {
                    throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
                }
            }
            this.scheduler.addKnownMapOutput(parseFrom.getHost(), parseFrom.getPort(), sourceIndex, getBaseURI(parseFrom.getHost(), parseFrom.getPort(), sourceIndex).toString(), constructInputAttemptIdentifier(dataMovementEvent, parseFrom));
        } catch (InvalidProtocolBufferException e2) {
            throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e2);
        }
    }

    private void processTaskFailedEvent(InputFailedEvent inputFailedEvent) {
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(inputFailedEvent.getTargetIndex(), inputFailedEvent.getVersion());
        this.scheduler.obsoleteInput(inputAttemptIdentifier);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Obsoleting output of src-task: " + inputAttemptIdentifier);
        }
    }

    @VisibleForTesting
    URI getBaseURI(String str, int i, int i2) {
        return URI.create(ShuffleUtils.constructBaseURIForShuffleHandler(str, i, i2, this.inputContext.getApplicationId().toString(), this.sslShuffle).toString());
    }

    private InputAttemptIdentifier constructInputAttemptIdentifier(DataMovementEvent dataMovementEvent, ShuffleUserPayloads.DataMovementEventPayloadProto dataMovementEventPayloadProto) {
        InputAttemptIdentifier inputAttemptIdentifier;
        String pathComponent = dataMovementEventPayloadProto.hasPathComponent() ? dataMovementEventPayloadProto.getPathComponent() : null;
        int spillId = dataMovementEventPayloadProto.getSpillId();
        if (dataMovementEventPayloadProto.hasSpillId()) {
            inputAttemptIdentifier = new InputAttemptIdentifier(new InputIdentifier(dataMovementEvent.getTargetIndex()), dataMovementEvent.getVersion(), pathComponent, false, dataMovementEventPayloadProto.getLastEvent() ? InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, spillId);
        } else {
            inputAttemptIdentifier = new InputAttemptIdentifier(dataMovementEvent.getTargetIndex(), dataMovementEvent.getVersion(), pathComponent);
        }
        return inputAttemptIdentifier;
    }
}
