package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Inflater;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleInputEventHandlerOrderedGrouped.class */
public class ShuffleInputEventHandlerOrderedGrouped implements ShuffleEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerOrderedGrouped.class);
    private final ShuffleScheduler scheduler;
    private final InputContext inputContext;
    private final boolean compositeFetch;
    private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
    private final AtomicInteger numDmeEvents = new AtomicInteger(0);
    private final AtomicInteger numObsoletionEvents = new AtomicInteger(0);
    private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);
    private final Inflater inflater = TezCommonUtils.newInflater();

    public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext, ShuffleScheduler shuffleScheduler, boolean z) {
        this.inputContext = inputContext;
        this.scheduler = shuffleScheduler;
        this.compositeFetch = z;
    }

    @Override // org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler
    public void handleEvents(List<Event> list) throws IOException {
        Iterator<Event> it = list.iterator();
        while (it.hasNext()) {
            handleEvent(it.next());
        }
    }

    @Override // org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler
    public void logProgress(boolean z) {
        LOG.info(this.inputContext.getInputOutputVertexNames() + ": numDmeEventsSeen=" + this.numDmeEvents.get() + ", numDmeEventsSeenWithNoData=" + this.numDmeEventsNoData.get() + ", numObsoletionEventsSeen=" + this.numObsoletionEvents.get() + (z ? ", updateOnClose" : TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT));
    }

    private void handleEvent(Event event) throws IOException {
        if (event instanceof DataMovementEvent) {
            this.numDmeEvents.incrementAndGet();
            DataMovementEvent dataMovementEvent = (DataMovementEvent) event;
            try {
                ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(dataMovementEvent.getUserPayload()));
                BitSet bitSet = null;
                if (parseFrom.hasEmptyPartitions()) {
                    try {
                        bitSet = TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(parseFrom.getEmptyPartitions(), this.inflater));
                    } catch (IOException e) {
                        throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
                    }
                }
                processDataMovementEvent(dataMovementEvent, parseFrom, bitSet);
                this.scheduler.updateEventReceivedTime();
            } catch (InvalidProtocolBufferException e2) {
                throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e2);
            }
        } else if (event instanceof CompositeRoutedDataMovementEvent) {
            CompositeRoutedDataMovementEvent compositeRoutedDataMovementEvent = (CompositeRoutedDataMovementEvent) event;
            try {
                ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom2 = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeRoutedDataMovementEvent.getUserPayload()));
                BitSet bitSet2 = null;
                if (parseFrom2.hasEmptyPartitions()) {
                    try {
                        bitSet2 = TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(parseFrom2.getEmptyPartitions(), this.inflater));
                    } catch (IOException e3) {
                        throw new TezUncheckedException("Unable to set the empty partition to succeeded", e3);
                    }
                }
                if (this.compositeFetch) {
                    this.numDmeEvents.addAndGet(compositeRoutedDataMovementEvent.getCount());
                    processCompositeRoutedDataMovementEvent(compositeRoutedDataMovementEvent, parseFrom2, bitSet2);
                } else {
                    for (int i = 0; i < compositeRoutedDataMovementEvent.getCount(); i++) {
                        this.numDmeEvents.incrementAndGet();
                        processDataMovementEvent(compositeRoutedDataMovementEvent.expand(i), parseFrom2, bitSet2);
                    }
                }
                this.scheduler.updateEventReceivedTime();
            } catch (InvalidProtocolBufferException e4) {
                throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e4);
            }
        } else if (event instanceof InputFailedEvent) {
            this.numObsoletionEvents.incrementAndGet();
            processTaskFailedEvent((InputFailedEvent) event);
        }
        if (this.numDmeEvents.get() + this.numObsoletionEvents.get() > this.nextToLogEventCount.get()) {
            logProgress(false);
            this.nextToLogEventCount.addAndGet(50);
        }
    }

    private void processDataMovementEvent(DataMovementEvent dataMovementEvent, ShuffleUserPayloads.DataMovementEventPayloadProto dataMovementEventPayloadProto, BitSet bitSet) throws IOException {
        int sourceIndex = dataMovementEvent.getSourceIndex();
        CompositeInputAttemptIdentifier constructInputAttemptIdentifier = constructInputAttemptIdentifier(dataMovementEvent.getTargetIndex(), 1, dataMovementEvent.getVersion(), dataMovementEventPayloadProto);
        if (LOG.isDebugEnabled()) {
            LOG.debug("DME srcIdx: " + sourceIndex + ", targetIdx: " + dataMovementEvent.getTargetIndex() + ", attemptNum: " + dataMovementEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(dataMovementEventPayloadProto));
        }
        if (dataMovementEventPayloadProto.hasEmptyPartitions()) {
            try {
                if (bitSet.get(sourceIndex)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Source partition: " + sourceIndex + " did not generate any data. SrcAttempt: [" + constructInputAttemptIdentifier + "]. Not fetching.");
                    }
                    this.numDmeEventsNoData.getAndIncrement();
                    this.scheduler.copySucceeded(constructInputAttemptIdentifier.expand(0), null, 0L, 0L, 0L, null, true);
                    return;
                }
            } catch (IOException e) {
                throw new TezUncheckedException("Unable to set the empty partition to succeeded", e);
            }
        }
        this.scheduler.addKnownMapOutput(StringInterner.weakIntern(dataMovementEventPayloadProto.getHost()), dataMovementEventPayloadProto.getPort(), sourceIndex, constructInputAttemptIdentifier);
    }

    private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovementEvent compositeRoutedDataMovementEvent, ShuffleUserPayloads.DataMovementEventPayloadProto dataMovementEventPayloadProto, BitSet bitSet) throws IOException {
        int sourceIndex = compositeRoutedDataMovementEvent.getSourceIndex();
        CompositeInputAttemptIdentifier constructInputAttemptIdentifier = constructInputAttemptIdentifier(compositeRoutedDataMovementEvent.getTargetIndex(), compositeRoutedDataMovementEvent.getCount(), compositeRoutedDataMovementEvent.getVersion(), dataMovementEventPayloadProto);
        if (LOG.isDebugEnabled()) {
            LOG.debug("DME srcIdx: " + sourceIndex + ", targetIdx: " + compositeRoutedDataMovementEvent.getTargetIndex() + ", count:" + compositeRoutedDataMovementEvent.getCount() + ", attemptNum: " + compositeRoutedDataMovementEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(dataMovementEventPayloadProto));
        }
        if (dataMovementEventPayloadProto.hasEmptyPartitions()) {
            boolean z = true;
            for (int i = 0; i < compositeRoutedDataMovementEvent.getCount(); i++) {
                int i2 = sourceIndex + i;
                z &= bitSet.get(i2);
                if (bitSet.get(i2)) {
                    InputAttemptIdentifier expand = constructInputAttemptIdentifier.expand(i);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Source partition: " + i2 + " did not generate any data. SrcAttempt: [" + expand + "]. Not fetching.");
                    }
                    this.numDmeEventsNoData.getAndIncrement();
                    this.scheduler.copySucceeded(expand, null, 0L, 0L, 0L, null, true);
                }
            }
            if (z) {
                return;
            }
        }
        this.scheduler.addKnownMapOutput(StringInterner.weakIntern(dataMovementEventPayloadProto.getHost()), dataMovementEventPayloadProto.getPort(), sourceIndex, constructInputAttemptIdentifier);
    }

    private void processTaskFailedEvent(InputFailedEvent inputFailedEvent) {
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(inputFailedEvent.getTargetIndex(), inputFailedEvent.getVersion());
        this.scheduler.obsoleteInput(inputAttemptIdentifier);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Obsoleting output of src-task: " + inputAttemptIdentifier);
        }
    }

    private CompositeInputAttemptIdentifier constructInputAttemptIdentifier(int i, int i2, int i3, ShuffleUserPayloads.DataMovementEventPayloadProto dataMovementEventPayloadProto) {
        CompositeInputAttemptIdentifier compositeInputAttemptIdentifier;
        String weakIntern = dataMovementEventPayloadProto.hasPathComponent() ? StringInterner.weakIntern(dataMovementEventPayloadProto.getPathComponent()) : null;
        int spillId = dataMovementEventPayloadProto.getSpillId();
        if (dataMovementEventPayloadProto.hasSpillId()) {
            compositeInputAttemptIdentifier = new CompositeInputAttemptIdentifier(i, i3, weakIntern, false, dataMovementEventPayloadProto.getLastEvent() ? InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, spillId, i2);
        } else {
            compositeInputAttemptIdentifier = new CompositeInputAttemptIdentifier(i, i3, weakIntern, i2);
        }
        return compositeInputAttemptIdentifier;
    }
}
