package org.apache.flink.runtime.resourcemanager.active;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.ThresholdMeter;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.class */
public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable> extends ResourceManager<WorkerType> implements ResourceEventHandler<WorkerType> {
    protected final Configuration flinkConfig;
    private final Time startWorkerRetryInterval;
    private final ResourceManagerDriver<WorkerType> resourceManagerDriver;
    private final Map<ResourceID, WorkerType> workerNodeMap;
    private final PendingWorkerCounter pendingWorkerCounter;
    private final Map<ResourceID, WorkerResourceSpec> currentAttemptUnregisteredWorkers;
    private final Set<ResourceID> previousAttemptUnregisteredWorkers;
    private final ThresholdMeter startWorkerFailureRater;
    private final Time workerRegistrationTimeout;
    private CompletableFuture<Void> startWorkerCoolDown;

    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager$GatewayMainThreadExecutor.class */
    private class GatewayMainThreadExecutor implements ScheduledExecutor {
        private GatewayMainThreadExecutor() {
        }

        @Override // org.apache.flink.runtime.concurrent.ScheduledExecutor
        public ScheduledFuture<?> schedule(Runnable runnable, long j, TimeUnit timeUnit) {
            return ActiveResourceManager.this.getMainThreadExecutor().schedule(runnable, j, timeUnit);
        }

        @Override // org.apache.flink.runtime.concurrent.ScheduledExecutor
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long j, TimeUnit timeUnit) {
            return ActiveResourceManager.this.getMainThreadExecutor().schedule(callable, j, timeUnit);
        }

        @Override // org.apache.flink.runtime.concurrent.ScheduledExecutor
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
            return ActiveResourceManager.this.getMainThreadExecutor().scheduleAtFixedRate(runnable, j, j2, timeUnit);
        }

        @Override // org.apache.flink.runtime.concurrent.ScheduledExecutor
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
            return ActiveResourceManager.this.getMainThreadExecutor().scheduleWithFixedDelay(runnable, j, j2, timeUnit);
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            ActiveResourceManager.this.getMainThreadExecutor().execute(runnable);
        }
    }

    public ActiveResourceManager(ResourceManagerDriver<WorkerType> resourceManagerDriver, Configuration configuration, RpcService rpcService, ResourceID resourceID, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory resourceManagerPartitionTrackerFactory, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, ResourceManagerMetricGroup resourceManagerMetricGroup, ThresholdMeter thresholdMeter, Duration duration, Duration duration2, Executor executor) {
        super(rpcService, resourceID, highAvailabilityServices, heartbeatServices, slotManager, resourceManagerPartitionTrackerFactory, jobLeaderIdService, clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, AkkaUtils.getTimeoutAsTime((Configuration) Preconditions.checkNotNull(configuration)), executor);
        this.flinkConfig = configuration;
        this.resourceManagerDriver = resourceManagerDriver;
        this.workerNodeMap = new HashMap();
        this.pendingWorkerCounter = new PendingWorkerCounter();
        this.currentAttemptUnregisteredWorkers = new HashMap();
        this.previousAttemptUnregisteredWorkers = new HashSet();
        this.startWorkerFailureRater = (ThresholdMeter) Preconditions.checkNotNull(thresholdMeter);
        this.startWorkerRetryInterval = Time.of(duration.toMillis(), TimeUnit.MILLISECONDS);
        this.workerRegistrationTimeout = Time.of(duration2.toMillis(), TimeUnit.MILLISECONDS);
        this.startWorkerCoolDown = FutureUtils.completedVoidFuture();
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void initialize() throws ResourceManagerException {
        try {
            this.resourceManagerDriver.initialize(this, new GatewayMainThreadExecutor(), this.ioExecutor);
        } catch (Exception e) {
            throw new ResourceManagerException("Cannot initialize resource provider.", e);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void terminate() throws ResourceManagerException {
        try {
            this.resourceManagerDriver.terminate().get();
        } catch (Exception e) {
            throw new ResourceManagerException("Cannot terminate resource provider.", e);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected CompletableFuture<Void> prepareLeadershipAsync() {
        return this.resourceManagerDriver.onGrantLeadership();
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected CompletableFuture<Void> clearStateAsync() {
        return this.resourceManagerDriver.onRevokeLeadership();
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void internalDeregisterApplication(ApplicationStatus applicationStatus, @Nullable String str) throws ResourceManagerException {
        try {
            this.resourceManagerDriver.deregisterApplication(applicationStatus, str);
        } catch (Exception e) {
            throw new ResourceManagerException("Cannot deregister application.", e);
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
        requestNewWorker(workerResourceSpec);
        return true;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected WorkerType workerStarted(ResourceID resourceID) {
        return this.workerNodeMap.get(resourceID);
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    public boolean stopWorker(WorkerType workertype) {
        internalStopWorker(workertype.getResourceID());
        return true;
    }

    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    protected void onWorkerRegistered(WorkerType workertype) {
        ResourceID resourceID = workertype.getResourceID();
        this.log.info("Worker {} is registered.", resourceID.getStringWithMetadata());
        WorkerResourceSpec remove = this.currentAttemptUnregisteredWorkers.remove(resourceID);
        this.previousAttemptUnregisteredWorkers.remove(resourceID);
        if (remove != null) {
            this.log.info("Worker {} with resource spec {} was requested in current attempt. Current pending count after registering: {}.", new Object[]{resourceID.getStringWithMetadata(), remove, Integer.valueOf(this.pendingWorkerCounter.decreaseAndGet(remove))});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.resourcemanager.ResourceManager
    public void registerMetrics() {
        super.registerMetrics();
        this.resourceManagerMetricGroup.meter(MetricNames.START_WORKER_FAILURE_RATE, (String) this.startWorkerFailureRater);
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> collection) {
        getMainThreadExecutor().assertRunningInMainThread();
        this.log.info("Recovered {} workers from previous attempt.", Integer.valueOf(collection.size()));
        for (WorkerType workertype : collection) {
            ResourceID resourceID = workertype.getResourceID();
            this.workerNodeMap.put(resourceID, workertype);
            this.previousAttemptUnregisteredWorkers.add(resourceID);
            scheduleWorkerRegistrationTimeoutCheck(resourceID);
            this.log.info("Worker {} recovered from previous attempt.", resourceID.getStringWithMetadata());
        }
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public void onWorkerTerminated(ResourceID resourceID, String str) {
        if (this.currentAttemptUnregisteredWorkers.containsKey(resourceID)) {
            recordWorkerFailureAndPauseWorkerCreationIfNeeded();
        }
        if (clearStateForWorker(resourceID)) {
            this.log.info("Worker {} is terminated. Diagnostics: {}", resourceID.getStringWithMetadata(), str);
            requestWorkerIfRequired();
        }
        closeTaskManagerConnection(resourceID, new Exception(str));
    }

    @Override // org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler
    public void onError(Throwable th) {
        onFatalError(th);
    }

    private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
        TaskExecutorProcessSpec processSpecFromWorkerResourceSpec = TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, workerResourceSpec);
        this.log.info("Requesting new worker with resource spec {}, current pending count: {}.", workerResourceSpec, Integer.valueOf(this.pendingWorkerCounter.increaseAndGet(workerResourceSpec)));
        FutureUtils.assertNoException(this.startWorkerCoolDown.thenCompose(r5 -> {
            return this.resourceManagerDriver.requestResource(processSpecFromWorkerResourceSpec);
        }).handle((BiFunction<? super U, Throwable, ? extends U>) (resourceIDRetrievable, th) -> {
            if (th != null) {
                this.log.warn("Failed requesting worker with resource spec {}, current pending count: {}", new Object[]{workerResourceSpec, Integer.valueOf(this.pendingWorkerCounter.decreaseAndGet(workerResourceSpec)), th});
                recordWorkerFailureAndPauseWorkerCreationIfNeeded();
                requestWorkerIfRequired();
                return null;
            }
            ResourceID resourceID = resourceIDRetrievable.getResourceID();
            this.workerNodeMap.put(resourceID, resourceIDRetrievable);
            this.currentAttemptUnregisteredWorkers.put(resourceID, workerResourceSpec);
            scheduleWorkerRegistrationTimeoutCheck(resourceID);
            this.log.info("Requested worker {} with resource spec {}.", resourceID.getStringWithMetadata(), workerResourceSpec);
            return null;
        }));
    }

    private void scheduleWorkerRegistrationTimeoutCheck(ResourceID resourceID) {
        scheduleRunAsync(() -> {
            if (this.currentAttemptUnregisteredWorkers.containsKey(resourceID) || this.previousAttemptUnregisteredWorkers.contains(resourceID)) {
                this.log.warn("Worker {} did not register in {}, will stop it and request a new one if needed.", resourceID, this.workerRegistrationTimeout);
                internalStopWorker(resourceID);
                requestWorkerIfRequired();
            }
        }, this.workerRegistrationTimeout);
    }

    private void internalStopWorker(ResourceID resourceID) {
        if (!hasLeadership()) {
            this.log.warn("Cannot stop worker {}. Does not have leadership.", resourceID.getStringWithMetadata());
            return;
        }
        this.log.info("Stopping worker {}.", resourceID.getStringWithMetadata());
        WorkerType workertype = this.workerNodeMap.get(resourceID);
        if (workertype != null) {
            this.resourceManagerDriver.releaseResource(workertype);
        }
        clearStateForWorker(resourceID);
    }

    private boolean clearStateForWorker(ResourceID resourceID) {
        if (this.workerNodeMap.remove(resourceID) == null) {
            this.log.debug("Ignore unrecognized worker {}.", resourceID.getStringWithMetadata());
            return false;
        }
        WorkerResourceSpec remove = this.currentAttemptUnregisteredWorkers.remove(resourceID);
        this.previousAttemptUnregisteredWorkers.remove(resourceID);
        if (remove == null) {
            return true;
        }
        this.log.info("Worker {} with resource spec {} was requested in current attempt and has not registered. Current pending count after removing: {}.", new Object[]{resourceID.getStringWithMetadata(), remove, Integer.valueOf(this.pendingWorkerCounter.decreaseAndGet(remove))});
        return true;
    }

    private void requestWorkerIfRequired() {
        for (Map.Entry<WorkerResourceSpec, Integer> entry : getRequiredResources().entrySet()) {
            WorkerResourceSpec key = entry.getKey();
            int intValue = entry.getValue().intValue();
            while (intValue > this.pendingWorkerCounter.getNum(key)) {
                requestNewWorker(key);
            }
        }
    }

    private void recordWorkerFailureAndPauseWorkerCreationIfNeeded() {
        if (recordStartWorkerFailure()) {
            tryResetWorkerCreationCoolDown();
        }
    }

    private boolean recordStartWorkerFailure() {
        this.startWorkerFailureRater.markEvent();
        try {
            this.startWorkerFailureRater.checkAgainstThreshold();
            return false;
        } catch (ThresholdMeter.ThresholdExceedException e) {
            this.log.warn("Reaching max start worker failure rate: {}", e.getMessage());
            return true;
        }
    }

    private void tryResetWorkerCreationCoolDown() {
        if (this.startWorkerCoolDown.isDone()) {
            this.log.info("Will not retry creating worker in {}.", this.startWorkerRetryInterval);
            this.startWorkerCoolDown = new CompletableFuture<>();
            scheduleRunAsync(() -> {
                this.startWorkerCoolDown.complete(null);
            }, this.startWorkerRetryInterval);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @VisibleForTesting
    <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time time) {
        return (CompletableFuture<T>) callAsync(callable, time);
    }
}
