package org.apache.tez.dag.library.vertexmanager;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.DataInputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.Inflater;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.io.NonSyncByteArrayInputStream;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.TaskIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase.class */
public abstract class ShuffleVertexManagerBase extends VertexManagerPlugin {
    static long MB = 1048576;
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleVertexManagerBase.class);
    ComputeRoutingAction computeRoutingAction;
    int totalNumBipartiteSourceTasks;
    int numBipartiteSourceTasksCompleted;
    int numVertexManagerEventsReceived;
    List<VertexManagerEvent> pendingVMEvents;
    AtomicBoolean onVertexStartedDone;
    private Set<TaskIdentifier> taskWithVmEvents;
    private final Map<String, SourceVertexInfo> srcVertexInfo;
    boolean sourceVerticesScheduled;

    @VisibleForTesting
    int bipartiteSources;
    long completedSourceTasksOutputSize;
    List<VertexStateUpdate> pendingStateUpdates;
    List<PendingTaskInfo> pendingTasks;
    int totalTasksToSchedule;

    @VisibleForTesting
    Configuration conf;
    ShuffleVertexManagerBaseConfig config;
    final Inflater inflater;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase$ComputeRoutingAction.class */
    public enum ComputeRoutingAction {
        WAIT,
        SKIP,
        COMPUTE;

        public boolean determined() {
            return this != WAIT;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase$PendingTaskInfo.class */
    public static class PendingTaskInfo {
        private final int index;
        private int inputStats;

        public PendingTaskInfo(int i) {
            this.index = i;
        }

        public String toString() {
            return "[index=" + this.index + ", inputStats=" + this.inputStats + "]";
        }

        public int getIndex() {
            return this.index;
        }

        public int getInputStats() {
            return this.inputStats;
        }

        public boolean setInputStats(int i) {
            if (i <= 0 || this.inputStats == i) {
                return false;
            }
            this.inputStats = i;
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase$ReconfigVertexParams.class */
    public static class ReconfigVertexParams {
        private final int finalParallelism;
        private final VertexLocationHint locationHint;

        public ReconfigVertexParams(int i, VertexLocationHint vertexLocationHint) {
            this.finalParallelism = i;
            this.locationHint = vertexLocationHint;
        }

        public int getFinalParallelism() {
            return this.finalParallelism;
        }

        public VertexLocationHint getLocationHint() {
            return this.locationHint;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase$ShuffleVertexManagerBaseConfig.class */
    public static class ShuffleVertexManagerBaseConfig {
        private final boolean enableAutoParallelism;
        private final long desiredTaskInputDataSize;
        private final float slowStartMinFraction;
        private final float slowStartMaxFraction;

        public ShuffleVertexManagerBaseConfig(boolean z, long j, float f, float f2) {
            if (f < TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT || f2 > 1.0f || f2 < f) {
                throw new IllegalArgumentException("Invalid values for slowStartMinFraction/slowStartMaxFraction. Min cannot be < 0, max cannot be > 1, and max cannot be < min., configuredMin=" + f + ", configuredMax=" + f2);
            }
            this.enableAutoParallelism = z;
            this.desiredTaskInputDataSize = j;
            this.slowStartMinFraction = f;
            this.slowStartMaxFraction = f2;
            ShuffleVertexManagerBase.LOG.info("Settings minFrac: {} maxFrac: {} auto: {} desiredTaskIput: {}", new Object[]{Float.valueOf(f), Float.valueOf(f2), Boolean.valueOf(z), Long.valueOf(j)});
        }

        public boolean isAutoParallelismEnabled() {
            return this.enableAutoParallelism;
        }

        public long getDesiredTaskInputDataSize() {
            return this.desiredTaskInputDataSize;
        }

        public float getMinFraction() {
            return this.slowStartMinFraction;
        }

        public float getMaxFraction() {
            return this.slowStartMaxFraction;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/ShuffleVertexManagerBase$SourceVertexInfo.class */
    public static class SourceVertexInfo {
        final EdgeProperty edgeProperty;
        boolean vertexIsConfigured;
        final BitSet finishedTaskSet = new BitSet();
        int numTasks;
        int numVMEventsReceived;
        long outputSize;
        int[] statsInMB;
        EdgeManagerPluginDescriptor newDescriptor;

        /* JADX INFO: Access modifiers changed from: package-private */
        public SourceVertexInfo(EdgeProperty edgeProperty, int i) {
            this.edgeProperty = edgeProperty;
            this.statsInMB = new int[i];
        }

        int getNumTasks() {
            return this.numTasks;
        }

        int getNumCompletedTasks() {
            return this.finishedTaskSet.cardinality();
        }

        BigInteger getExpectedStatsAtIndex(int i) {
            return this.numVMEventsReceived == 0 ? BigInteger.ZERO : BigInteger.valueOf(this.statsInMB[i]).multiply(BigInteger.valueOf(this.numTasks)).divide(BigInteger.valueOf(this.numVMEventsReceived)).multiply(BigInteger.valueOf(ShuffleVertexManagerBase.MB));
        }
    }

    SourceVertexInfo createSourceVertexInfo(EdgeProperty edgeProperty, int i) {
        return new SourceVertexInfo(edgeProperty, i);
    }

    public ShuffleVertexManagerBase(VertexManagerPluginContext vertexManagerPluginContext) {
        super(vertexManagerPluginContext);
        this.computeRoutingAction = ComputeRoutingAction.WAIT;
        this.totalNumBipartiteSourceTasks = 0;
        this.numBipartiteSourceTasksCompleted = 0;
        this.numVertexManagerEventsReceived = 0;
        this.pendingVMEvents = Lists.newLinkedList();
        this.onVertexStartedDone = new AtomicBoolean(false);
        this.taskWithVmEvents = Sets.newHashSet();
        this.srcVertexInfo = Maps.newConcurrentMap();
        this.sourceVerticesScheduled = false;
        this.bipartiteSources = 0;
        this.completedSourceTasksOutputSize = 0L;
        this.pendingStateUpdates = Lists.newArrayList();
        this.pendingTasks = Lists.newLinkedList();
        this.totalTasksToSchedule = 0;
        this.inflater = TezCommonUtils.newInflater();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public synchronized void onVertexStarted(List<TaskAttemptIdentifier> list) {
        for (Map.Entry entry : getContext().getInputVertexEdgeProperties().entrySet()) {
            this.srcVertexInfo.put(entry.getKey(), createSourceVertexInfo((EdgeProperty) entry.getValue(), getContext().getVertexNumTasks(getContext().getVertexName())));
            getContext().registerForVertexStateUpdates((String) entry.getKey(), EnumSet.of(VertexState.CONFIGURED));
            if (((EdgeProperty) entry.getValue()).getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER) {
                this.bipartiteSources++;
            }
        }
        onVertexStartedCheck();
        Iterator<VertexStateUpdate> it = this.pendingStateUpdates.iterator();
        while (it.hasNext()) {
            handleVertexStateUpdate(it.next());
        }
        this.pendingStateUpdates.clear();
        updatePendingTasks();
        Iterator<VertexManagerEvent> it2 = this.pendingVMEvents.iterator();
        while (it2.hasNext()) {
            handleVertexManagerEvent(it2.next());
        }
        this.pendingVMEvents.clear();
        LOG.info("OnVertexStarted vertex: {} with {} source tasks and {} pending tasks", new Object[]{getContext().getVertexName(), Integer.valueOf(this.totalNumBipartiteSourceTasks), Integer.valueOf(this.totalTasksToSchedule)});
        if (list != null) {
            Iterator<TaskAttemptIdentifier> it3 = list.iterator();
            while (it3.hasNext()) {
                onSourceTaskCompleted(it3.next());
            }
        }
        this.onVertexStartedDone.set(true);
        processPendingTasks(null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onVertexStartedCheck() {
        if (this.bipartiteSources == 0) {
            throw new TezUncheckedException("At least 1 bipartite source should exist");
        }
    }

    public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier taskAttemptIdentifier) {
        String name = taskAttemptIdentifier.getTaskIdentifier().getVertexIdentifier().getName();
        int identifier = taskAttemptIdentifier.getTaskIdentifier().getIdentifier();
        SourceVertexInfo sourceVertexInfo = this.srcVertexInfo.get(name);
        if (sourceVertexInfo.vertexIsConfigured) {
            Preconditions.checkState(identifier < sourceVertexInfo.numTasks, "Received completion for srcTaskId " + identifier + " but Vertex: " + name + " has only " + sourceVertexInfo.numTasks + " tasks");
        }
        BitSet bitSet = sourceVertexInfo.finishedTaskSet;
        if (!bitSet.get(identifier)) {
            bitSet.set(identifier);
            if (sourceVertexInfo.edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER) {
                this.numBipartiteSourceTasksCompleted++;
            }
        }
        processPendingTasks(taskAttemptIdentifier);
    }

    @VisibleForTesting
    void parsePartitionStats(SourceVertexInfo sourceVertexInfo, RoaringBitmap roaringBitmap) {
        Preconditions.checkState(sourceVertexInfo.statsInMB != null, "Stats should be initialized");
        Iterator it = roaringBitmap.iterator();
        DATA_RANGE_IN_MB[] values = DATA_RANGE_IN_MB.values();
        int length = values.length;
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            int i = intValue / length;
            int i2 = intValue % length;
            if (values[i2].getSizeInMB() > 0) {
                int[] iArr = sourceVertexInfo.statsInMB;
                iArr[i] = iArr[i] + values[i2].getSizeInMB();
            }
        }
    }

    void parseDetailedPartitionStats(SourceVertexInfo sourceVertexInfo, List<Integer> list) {
        for (int i = 0; i < list.size(); i++) {
            int[] iArr = sourceVertexInfo.statsInMB;
            int i2 = i;
            iArr[i2] = iArr[i2] + list.get(i).intValue();
        }
    }

    public synchronized void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
        if (this.onVertexStartedDone.get()) {
            handleVertexManagerEvent(vertexManagerEvent);
        } else {
            this.pendingVMEvents.add(vertexManagerEvent);
        }
    }

    private void handleVertexManagerEvent(VertexManagerEvent vertexManagerEvent) {
        TaskIdentifier taskIdentifier = vertexManagerEvent.getProducerAttemptIdentifier().getTaskIdentifier();
        if (!this.taskWithVmEvents.add(taskIdentifier)) {
            LOG.info("Ignoring vertex manager event from: {}", taskIdentifier);
            return;
        }
        SourceVertexInfo sourceVertexInfo = this.srcVertexInfo.get(taskIdentifier.getVertexIdentifier().getName());
        Preconditions.checkState(sourceVertexInfo != null, "Unknown vmEvent from " + taskIdentifier);
        this.numVertexManagerEventsReceived++;
        long j = 0;
        if (vertexManagerEvent.getUserPayload() != null) {
            try {
                ShuffleUserPayloads.VertexManagerEventPayloadProto parseFrom = ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vertexManagerEvent.getUserPayload()));
                j = parseFrom.getOutputSize();
                if (parseFrom.hasPartitionStats()) {
                    try {
                        RoaringBitmap roaringBitmap = new RoaringBitmap();
                        roaringBitmap.deserialize(new DataInputStream(new NonSyncByteArrayInputStream(TezCommonUtils.decompressByteStringToByteArray(parseFrom.getPartitionStats(), this.inflater))));
                        parsePartitionStats(sourceVertexInfo, roaringBitmap);
                    } catch (IOException e) {
                        throw new TezUncheckedException(e);
                    }
                } else if (parseFrom.hasDetailedPartitionStats()) {
                    parseDetailedPartitionStats(sourceVertexInfo, parseFrom.getDetailedPartitionStats().getSizeInMbList());
                }
                sourceVertexInfo.numVMEventsReceived++;
                sourceVertexInfo.outputSize += j;
                this.completedSourceTasksOutputSize += j;
            } catch (InvalidProtocolBufferException e2) {
                throw new TezUncheckedException(e2);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("For attempt: {} received info of output size: {} vertex numEventsReceived: {} vertex output size: {} total numEventsReceived: {} total output size: {}", new Object[]{vertexManagerEvent.getProducerAttemptIdentifier(), Long.valueOf(j), Integer.valueOf(sourceVertexInfo.numVMEventsReceived), Long.valueOf(sourceVertexInfo.outputSize), Integer.valueOf(this.numVertexManagerEventsReceived), Long.valueOf(this.completedSourceTasksOutputSize)});
        }
    }

    void updatePendingTasks() {
        int vertexNumTasks = getContext().getVertexNumTasks(getContext().getVertexName());
        if (vertexNumTasks == this.pendingTasks.size() || vertexNumTasks <= 0) {
            return;
        }
        this.pendingTasks.clear();
        for (int i = 0; i < vertexNumTasks; i++) {
            this.pendingTasks.add(new PendingTaskInfo(i));
        }
        this.totalTasksToSchedule = this.pendingTasks.size();
    }

    private ComputeRoutingAction getComputeRoutingAction(float f) {
        if (getNumOfTasksToSchedule(f) <= 0 && this.numBipartiteSourceTasksCompleted != this.totalNumBipartiteSourceTasks) {
            return ComputeRoutingAction.WAIT;
        }
        if (this.numVertexManagerEventsReceived == 0 && this.totalNumBipartiteSourceTasks > 0) {
            return ComputeRoutingAction.SKIP;
        }
        if (this.completedSourceTasksOutputSize >= this.config.getDesiredTaskInputDataSize() || f >= this.config.getMaxFraction()) {
            return ComputeRoutingAction.COMPUTE;
        }
        LOG.info("Defer scheduling tasks; vertex = {}, totalNumBipartiteSourceTasks = {}, completedSourceTasksOutputSize = {}, numVertexManagerEventsReceived = {}, numBipartiteSourceTasksCompleted = {}, minSourceVertexCompletedTaskFraction = {}", new Object[]{getContext().getVertexName(), Integer.valueOf(this.totalNumBipartiteSourceTasks), Long.valueOf(this.completedSourceTasksOutputSize), Integer.valueOf(this.numVertexManagerEventsReceived), Integer.valueOf(this.numBipartiteSourceTasksCompleted), Float.valueOf(f)});
        return ComputeRoutingAction.WAIT;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BigInteger getExpectedTotalBipartiteSourceTasksOutputSize() {
        BigInteger bigInteger = BigInteger.ZERO;
        Iterator<Map.Entry<String, SourceVertexInfo>> it = getBipartiteInfo().iterator();
        while (it.hasNext()) {
            SourceVertexInfo value = it.next().getValue();
            if (value.numTasks > 0 && value.numVMEventsReceived > 0) {
                bigInteger = bigInteger.add(BigInteger.valueOf(value.outputSize).multiply(BigInteger.valueOf(value.numTasks)).divide(BigInteger.valueOf(value.numVMEventsReceived)));
            }
        }
        return bigInteger;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getCurrentlyKnownStatsAtIndex(int i) {
        int i2 = 0;
        Iterator<SourceVertexInfo> it = getAllSourceVertexInfo().iterator();
        while (it.hasNext()) {
            i2 += it.next().statsInMB[i];
        }
        return i2;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getExpectedStatsAtIndex(int i) {
        BigInteger bigInteger = BigInteger.ZERO;
        Iterator<SourceVertexInfo> it = getAllSourceVertexInfo().iterator();
        while (it.hasNext()) {
            bigInteger = bigInteger.add(it.next().getExpectedStatsAtIndex(i));
        }
        if (bigInteger.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) <= 0) {
            return bigInteger.longValue();
        }
        LOG.warn("Partition {}'s size {} exceeded Long.MAX_VALUE", Integer.valueOf(i), bigInteger);
        return Long.MAX_VALUE;
    }

    abstract ReconfigVertexParams computeRouting();

    abstract void postReconfigVertex();

    @VisibleForTesting
    boolean determineParallelismAndApply(float f) {
        ReconfigVertexParams computeRouting;
        if (this.computeRoutingAction.equals(ComputeRoutingAction.WAIT)) {
            ComputeRoutingAction computeRoutingAction = getComputeRoutingAction(f);
            if (computeRoutingAction.equals(ComputeRoutingAction.COMPUTE) && (computeRouting = computeRouting()) != null) {
                reconfigVertex(computeRouting.getFinalParallelism());
                updatePendingTasks();
                postReconfigVertex();
            }
            if (!computeRoutingAction.equals(ComputeRoutingAction.WAIT)) {
                getContext().doneReconfiguringVertex();
            }
            this.computeRoutingAction = computeRoutingAction;
        }
        return this.computeRoutingAction.determined();
    }

    private boolean determineParallelismAndApply() {
        return determineParallelismAndApply(getMinSourceVertexCompletedTaskFraction());
    }

    abstract List<VertexManagerPluginContext.ScheduleTaskRequest> getTasksToSchedule(TaskAttemptIdentifier taskAttemptIdentifier);

    abstract void processPendingTasks();

    private void schedulePendingTasks(TaskAttemptIdentifier taskAttemptIdentifier) {
        List<VertexManagerPluginContext.ScheduleTaskRequest> tasksToSchedule = getTasksToSchedule(taskAttemptIdentifier);
        if (tasksToSchedule == null || tasksToSchedule.size() <= 0) {
            return;
        }
        getContext().scheduleTasks(tasksToSchedule);
    }

    Iterable<SourceVertexInfo> getAllSourceVertexInfo() {
        return this.srcVertexInfo.values();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SourceVertexInfo getSourceVertexInfo(String str) {
        return this.srcVertexInfo.get(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Iterable<Map.Entry<String, SourceVertexInfo>> getBipartiteInfo() {
        return Iterables.filter(this.srcVertexInfo.entrySet(), new Predicate<Map.Entry<String, SourceVertexInfo>>() { // from class: org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase.1
            public boolean apply(Map.Entry<String, SourceVertexInfo> entry) {
                return entry.getValue().edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER;
            }
        });
    }

    private boolean canScheduleTasks() {
        for (Map.Entry<String, SourceVertexInfo> entry : this.srcVertexInfo.entrySet()) {
            if (!entry.getValue().vertexIsConfigured) {
                if (!LOG.isDebugEnabled()) {
                    return false;
                }
                LOG.debug("Waiting for vertex: {} in vertex: {}", entry.getKey(), getContext().getVertexName());
                return false;
            }
        }
        this.sourceVerticesScheduled = true;
        return this.sourceVerticesScheduled;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int getNumOfTasksToScheduleAndLog(float f) {
        int numOfTasksToSchedule = getNumOfTasksToSchedule(f);
        if (numOfTasksToSchedule > 0) {
            LOG.info("Scheduling {} tasks for vertex: {} with totalTasks: {}. {} source tasks completed out of {}. MinSourceTaskCompletedFraction: {} min: {} max: {}", new Object[]{Integer.valueOf(numOfTasksToSchedule), getContext().getVertexName(), Integer.valueOf(this.totalTasksToSchedule), Integer.valueOf(this.numBipartiteSourceTasksCompleted), Integer.valueOf(this.totalNumBipartiteSourceTasks), Float.valueOf(f), Float.valueOf(this.config.getMinFraction()), Float.valueOf(this.config.getMaxFraction())});
        }
        return numOfTasksToSchedule;
    }

    int getNumOfTasksToSchedule(float f) {
        int size = this.pendingTasks.size();
        if (this.numBipartiteSourceTasksCompleted == this.totalNumBipartiteSourceTasks) {
            LOG.info("All source tasks completed. Ramping up {} remaining tasks for vertex: {}", Integer.valueOf(size), getContext().getVertexName());
            return size;
        }
        float f2 = 1.0f;
        float maxFraction = this.config.getMaxFraction() - this.config.getMinFraction();
        if (maxFraction > TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT) {
            f2 = (f - this.config.getMinFraction()) / maxFraction;
        } else if (f < this.config.getMinFraction()) {
            f2 = 0.0f;
        }
        return ((int) Math.ceil(Math.max(TezRuntimeConfiguration.TEZ_RUNTIME_INPUT_BUFFER_PERCENT_DEFAULT, Math.min(1.0f, f2)) * this.totalTasksToSchedule)) - (this.totalTasksToSchedule - size);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public float getMinSourceVertexCompletedTaskFraction() {
        float f = 1.0f;
        if (this.numBipartiteSourceTasksCompleted != this.totalNumBipartiteSourceTasks) {
            for (Map.Entry<String, SourceVertexInfo> entry : getBipartiteInfo()) {
                SourceVertexInfo value = entry.getValue();
                Preconditions.checkState(value.vertexIsConfigured, "Vertex: " + entry.getKey());
                if (value.numTasks > 0) {
                    float numCompletedTasks = value.getNumCompletedTasks() / value.numTasks;
                    if (f > numCompletedTasks) {
                        f = numCompletedTasks;
                    }
                }
            }
        }
        return f;
    }

    private boolean preconditionsSatisfied() {
        if (!this.onVertexStartedDone.get()) {
            return false;
        }
        if (this.sourceVerticesScheduled || canScheduleTasks()) {
            return true;
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("Defer scheduling tasks for vertex: {} as one task needs to be completed per source vertex", getContext().getVertexName());
        return false;
    }

    private void processPendingTasks(TaskAttemptIdentifier taskAttemptIdentifier) {
        if (preconditionsSatisfied()) {
            if (!this.config.isAutoParallelismEnabled() || determineParallelismAndApply()) {
                processPendingTasks();
                schedulePendingTasks(taskAttemptIdentifier);
            }
        }
    }

    abstract ShuffleVertexManagerBaseConfig initConfiguration();

    public void initialize() {
        try {
            this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
            this.config = initConfiguration();
            updatePendingTasks();
            if (this.config.isAutoParallelismEnabled()) {
                getContext().vertexReconfigurationPlanned();
            }
        } catch (IOException e) {
            throw new TezUncheckedException(e);
        }
    }

    private void handleVertexStateUpdate(VertexStateUpdate vertexStateUpdate) {
        Preconditions.checkArgument(vertexStateUpdate.getVertexState() == VertexState.CONFIGURED, "Received incorrect state notification : " + vertexStateUpdate.getVertexState() + " for vertex: " + vertexStateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
        Preconditions.checkArgument(this.srcVertexInfo.containsKey(vertexStateUpdate.getVertexName()), "Received incorrect vertex notification : " + vertexStateUpdate.getVertexState() + " for vertex: " + vertexStateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
        SourceVertexInfo sourceVertexInfo = this.srcVertexInfo.get(vertexStateUpdate.getVertexName());
        Preconditions.checkState(!sourceVertexInfo.vertexIsConfigured);
        sourceVertexInfo.vertexIsConfigured = true;
        sourceVertexInfo.numTasks = getContext().getVertexNumTasks(vertexStateUpdate.getVertexName());
        if (sourceVertexInfo.edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.SCATTER_GATHER) {
            this.totalNumBipartiteSourceTasks += sourceVertexInfo.numTasks;
        }
        LOG.info("Received configured notification : {} for vertex: {} in vertex: {} numBipartiteSourceTasks: {}", new Object[]{vertexStateUpdate.getVertexState(), vertexStateUpdate.getVertexName(), getContext().getVertexName(), Integer.valueOf(this.totalNumBipartiteSourceTasks)});
        processPendingTasks(null);
    }

    public synchronized void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        if (vertexStateUpdate.getVertexState() == VertexState.CONFIGURED) {
            if (this.onVertexStartedDone.get()) {
                handleVertexStateUpdate(vertexStateUpdate);
            } else {
                this.pendingStateUpdates.add(vertexStateUpdate);
            }
        }
    }

    public synchronized void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List<Event> list) {
    }

    private void reconfigVertex(int i) {
        HashMap hashMap = new HashMap(this.bipartiteSources);
        for (Map.Entry<String, SourceVertexInfo> entry : getBipartiteInfo()) {
            String key = entry.getKey();
            EdgeProperty edgeProperty = entry.getValue().edgeProperty;
            hashMap.put(key, EdgeProperty.create(entry.getValue().newDescriptor, edgeProperty.getDataSourceType(), edgeProperty.getSchedulingType(), edgeProperty.getEdgeSource(), edgeProperty.getEdgeDestination()));
        }
        getContext().reconfigureVertex(i, (VertexLocationHint) null, hashMap);
    }
}
