package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.SchedulingTopologyListener;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy.class */
public class VertexwiseSchedulingStrategy implements SchedulingStrategy, SchedulingTopologyListener {
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final Set<ExecutionVertexID> newVertices = new HashSet();

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/strategy/VertexwiseSchedulingStrategy$Factory.class */
    public static class Factory implements SchedulingStrategyFactory {
        @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
            return new VertexwiseSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }

    public VertexwiseSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations) Preconditions.checkNotNull(schedulerOperations);
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
        schedulingTopology.registerSchedulingTopologyListener(this);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void startScheduling() {
        maybeScheduleVertices((Set) IterableUtils.toStream(this.schedulingTopology.getVertices()).filter(schedulingExecutionVertex -> {
            return schedulingExecutionVertex.getConsumedPartitionGroups().isEmpty();
        }).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet()));
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void restartTasks(Set<ExecutionVertexID> set) {
        maybeScheduleVertices(set);
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onExecutionStateChange(ExecutionVertexID executionVertexID, ExecutionState executionState) {
        if (executionState == ExecutionState.FINISHED) {
            maybeScheduleVertices((Set) IterableUtils.toStream(this.schedulingTopology.getVertex(executionVertexID).getProducedResults()).map((v0) -> {
                return v0.getConsumerVertexGroups();
            }).flatMap((v0) -> {
                return v0.stream();
            }).flatMap((v0) -> {
                return IterableUtils.toStream(v0);
            }).collect(Collectors.toSet()));
        }
    }

    @Override // org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy
    public void onPartitionConsumable(IntermediateResultPartitionID intermediateResultPartitionID) {
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulingTopologyListener
    public void notifySchedulingTopologyUpdated(SchedulingTopology schedulingTopology, List<ExecutionVertexID> list) {
        Preconditions.checkState(schedulingTopology == this.schedulingTopology);
        this.newVertices.addAll(list);
    }

    private void maybeScheduleVertices(Set<ExecutionVertexID> set) {
        Set<ExecutionVertexID> hashSet;
        HashMap hashMap = new HashMap();
        if (this.newVertices.isEmpty()) {
            hashSet = set;
        } else {
            hashSet = new HashSet(set);
            hashSet.addAll(this.newVertices);
            this.newVertices.clear();
        }
        scheduleVerticesOneByOne((Set) hashSet.stream().filter(executionVertexID -> {
            SchedulingExecutionVertex vertex = this.schedulingTopology.getVertex(executionVertexID);
            Preconditions.checkState(vertex.getState() == ExecutionState.CREATED);
            return areVertexInputsAllConsumable(vertex, hashMap);
        }).collect(Collectors.toSet()));
    }

    private void scheduleVerticesOneByOne(Set<ExecutionVertexID> set) {
        if (set.isEmpty()) {
            return;
        }
        SchedulingStrategyUtils.sortExecutionVerticesInTopologicalOrder(this.schedulingTopology, set).forEach(executionVertexID -> {
            this.schedulerOperations.allocateSlotsAndDeploy(Collections.singletonList(executionVertexID));
        });
    }

    private boolean areVertexInputsAllConsumable(SchedulingExecutionVertex schedulingExecutionVertex, Map<ConsumedPartitionGroup, Boolean> map) {
        Iterator<ConsumedPartitionGroup> it = schedulingExecutionVertex.getConsumedPartitionGroups().iterator();
        while (it.hasNext()) {
            if (!map.computeIfAbsent(it.next(), this::isConsumedPartitionGroupConsumable).booleanValue()) {
                return false;
            }
        }
        return true;
    }

    private boolean isConsumedPartitionGroupConsumable(ConsumedPartitionGroup consumedPartitionGroup) {
        Iterator<IntermediateResultPartitionID> it = consumedPartitionGroup.iterator();
        while (it.hasNext()) {
            if (this.schedulingTopology.getResultPartition(it.next()).getState() != ResultPartitionState.CONSUMABLE) {
                return false;
            }
        }
        return true;
    }
}
