package org.apache.flink.runtime.scheduler;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultOperatorCoordinatorHandler.class */
public class DefaultOperatorCoordinatorHandler implements OperatorCoordinatorHandler {
    private final ExecutionGraph executionGraph;
    private final Map<OperatorID, OperatorCoordinatorHolder> coordinatorMap;
    private final GlobalFailureHandler globalFailureHandler;

    public DefaultOperatorCoordinatorHandler(ExecutionGraph executionGraph, GlobalFailureHandler globalFailureHandler) {
        this.executionGraph = executionGraph;
        this.coordinatorMap = createCoordinatorMap(executionGraph);
        this.globalFailureHandler = globalFailureHandler;
    }

    private static Map<OperatorID, OperatorCoordinatorHolder> createCoordinatorMap(ExecutionGraph executionGraph) {
        return (Map) executionGraph.getAllVertices().values().stream().filter((v0) -> {
            return v0.isInitialized();
        }).flatMap(executionJobVertex -> {
            return executionJobVertex.getOperatorCoordinators().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.operatorId();
        }, Function.identity()));
    }

    @Override // org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler
    public void initializeOperatorCoordinators(ComponentMainThreadExecutor componentMainThreadExecutor) {
        Iterator<OperatorCoordinatorHolder> it = this.coordinatorMap.values().iterator();
        while (it.hasNext()) {
            it.next().lazyInitialize(this.globalFailureHandler, componentMainThreadExecutor, this.executionGraph.getCheckpointCoordinator());
        }
    }

    @Override // org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler
    public void startAllOperatorCoordinators() {
        startOperatorCoordinators(this.coordinatorMap.values());
    }

    @Override // org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler
    public void disposeAllOperatorCoordinators() {
        this.coordinatorMap.values().forEach((v0) -> {
            IOUtils.closeQuietly(v0);
        });
    }

    @Override // org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler
    public void deliverOperatorEventToCoordinator(ExecutionAttemptID executionAttemptID, OperatorID operatorID, OperatorEvent operatorEvent) throws FlinkException {
        Execution execution = this.executionGraph.getRegisteredExecutions().get(executionAttemptID);
        if (execution == null || !(execution.getState() == ExecutionState.RUNNING || execution.getState() == ExecutionState.INITIALIZING)) {
            throw new TaskNotRunningException("Task is not known or in state running on the JobManager.");
        }
        OperatorCoordinatorHolder operatorCoordinatorHolder = this.coordinatorMap.get(operatorID);
        if (operatorCoordinatorHolder == null) {
            throw new FlinkException("No coordinator registered for operator " + operatorID);
        }
        try {
            operatorCoordinatorHolder.handleEventFromOperator(execution.getParallelSubtaskIndex(), execution.getAttemptNumber(), operatorEvent);
        } catch (Throwable th) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
            this.globalFailureHandler.handleGlobalFailure(th);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operatorID, CoordinationRequest coordinationRequest) throws FlinkException {
        OperatorCoordinatorHolder operatorCoordinatorHolder = this.coordinatorMap.get(operatorID);
        if (operatorCoordinatorHolder == null) {
            throw new FlinkException("Coordinator of operator " + operatorID + " does not exist or the job vertex this operator belongs to is not initialized.");
        }
        OperatorCoordinator coordinator = operatorCoordinatorHolder.coordinator();
        if (coordinator instanceof CoordinationRequestHandler) {
            return ((CoordinationRequestHandler) coordinator).handleCoordinationRequest(coordinationRequest);
        }
        throw new FlinkException("Coordinator of operator " + operatorID + " cannot handle client event");
    }

    @Override // org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler
    public void registerAndStartNewCoordinators(Collection<OperatorCoordinatorHolder> collection, ComponentMainThreadExecutor componentMainThreadExecutor, int i) {
        for (OperatorCoordinatorHolder operatorCoordinatorHolder : collection) {
            this.coordinatorMap.put(operatorCoordinatorHolder.operatorId(), operatorCoordinatorHolder);
            operatorCoordinatorHolder.lazyInitialize(this.globalFailureHandler, componentMainThreadExecutor, this.executionGraph.getCheckpointCoordinator(), i);
        }
        startOperatorCoordinators(collection);
    }

    private void startOperatorCoordinators(Collection<OperatorCoordinatorHolder> collection) {
        try {
            Iterator<OperatorCoordinatorHolder> it = collection.iterator();
            while (it.hasNext()) {
                it.next().start();
            }
        } catch (Throwable th) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
            collection.forEach((v0) -> {
                IOUtils.closeQuietly(v0);
            });
            throw new FlinkRuntimeException("Failed to start the operator coordinators", th);
        }
    }

    @VisibleForTesting
    Map<OperatorID, OperatorCoordinatorHolder> getCoordinatorMap() {
        return this.coordinatorMap;
    }
}
