package org.apache.flink.runtime.checkpoint;

import java.util.ArrayDeque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator.class */
public class CheckpointCoordinator {
    private static final Logger LOG;
    private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
    private final JobID job;
    private final CheckpointProperties checkpointProperties;
    private final Executor executor;
    private final ExecutionVertex[] tasksToTrigger;
    private final ExecutionVertex[] tasksToWaitFor;
    private final ExecutionVertex[] tasksToCommitTo;
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final CheckpointStorage checkpointStorage;
    private final ArrayDeque<Long> recentPendingCheckpoints;
    private final CheckpointIDCounter checkpointIdCounter;
    private final long baseInterval;
    private final long checkpointTimeout;
    private final long minPauseBetweenCheckpointsNanos;
    private final int maxConcurrentCheckpointAttempts;
    private final ScheduledThreadPoolExecutor timer;
    private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;
    private JobStatusListener jobStatusListener;
    private ScheduledFuture<?> currentPeriodicTrigger;
    private long lastCheckpointCompletionNanos;
    private boolean periodicScheduling;
    private boolean triggerRequestQueued;
    private volatile boolean shutdown;

    @Nullable
    private CheckpointStatsTracker statsTracker;
    private final SharedStateRegistryFactory sharedStateRegistryFactory;
    private SharedStateRegistry sharedStateRegistry;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object lock = new Object();
    private final Object triggerLock = new Object();
    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinator$ScheduledTrigger.class */
    public final class ScheduledTrigger implements Runnable {
        private ScheduledTrigger() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                CheckpointCoordinator.this.triggerCheckpoint(System.currentTimeMillis(), true);
            } catch (Exception e) {
                CheckpointCoordinator.LOG.error("Exception while triggering checkpoint for job {}.", CheckpointCoordinator.this.job, e);
            }
        }
    }

    public CheckpointCoordinator(JobID jobID, long j, long j2, long j3, int i, CheckpointRetentionPolicy checkpointRetentionPolicy, ExecutionVertex[] executionVertexArr, ExecutionVertex[] executionVertexArr2, ExecutionVertex[] executionVertexArr3, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore completedCheckpointStore, StateBackend stateBackend, Executor executor, SharedStateRegistryFactory sharedStateRegistryFactory) {
        Preconditions.checkNotNull(stateBackend);
        Preconditions.checkArgument(j > 0, "Checkpoint base interval must be larger than zero");
        Preconditions.checkArgument(j2 >= 1, "Checkpoint timeout must be larger than zero");
        Preconditions.checkArgument(j3 >= 0, "minPauseBetweenCheckpoints must be >= 0");
        Preconditions.checkArgument(i >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
        j3 = j3 > 31536000000L ? 31536000000L : j3;
        j = j < j3 ? j3 : j;
        this.job = (JobID) Preconditions.checkNotNull(jobID);
        this.baseInterval = j;
        this.checkpointTimeout = j2;
        this.minPauseBetweenCheckpointsNanos = j3 * 1000000;
        this.maxConcurrentCheckpointAttempts = i;
        this.tasksToTrigger = (ExecutionVertex[]) Preconditions.checkNotNull(executionVertexArr);
        this.tasksToWaitFor = (ExecutionVertex[]) Preconditions.checkNotNull(executionVertexArr2);
        this.tasksToCommitTo = (ExecutionVertex[]) Preconditions.checkNotNull(executionVertexArr3);
        this.pendingCheckpoints = new LinkedHashMap();
        this.checkpointIdCounter = (CheckpointIDCounter) Preconditions.checkNotNull(checkpointIDCounter);
        this.completedCheckpointStore = (CompletedCheckpointStore) Preconditions.checkNotNull(completedCheckpointStore);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        this.sharedStateRegistryFactory = (SharedStateRegistryFactory) Preconditions.checkNotNull(sharedStateRegistryFactory);
        this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
        this.recentPendingCheckpoints = new ArrayDeque<>(16);
        this.masterHooks = new HashMap<>();
        this.timer = new ScheduledThreadPoolExecutor(1, new DispatcherThreadFactory(Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
        this.timer.setRemoveOnCancelPolicy(true);
        this.timer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.timer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.checkpointProperties = CheckpointProperties.forCheckpoint(checkpointRetentionPolicy);
        try {
            this.checkpointStorage = stateBackend.createCheckpointStorage(jobID);
            checkpointIDCounter.start();
        } catch (Throwable th) {
            throw new RuntimeException("Failed to start checkpoint ID counter: " + th.getMessage(), th);
        }
    }

    public boolean addMasterHook(MasterTriggerRestoreHook<?> masterTriggerRestoreHook) {
        Preconditions.checkNotNull(masterTriggerRestoreHook);
        String identifier = masterTriggerRestoreHook.getIdentifier();
        Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(identifier), "The hook has a null or empty id");
        synchronized (this.lock) {
            if (this.masterHooks.containsKey(identifier)) {
                return false;
            }
            this.masterHooks.put(identifier, masterTriggerRestoreHook);
            return true;
        }
    }

    public int getNumberOfRegisteredMasterHooks() {
        int size;
        synchronized (this.lock) {
            size = this.masterHooks.size();
        }
        return size;
    }

    public void setCheckpointStatsTracker(@Nullable CheckpointStatsTracker checkpointStatsTracker) {
        this.statsTracker = checkpointStatsTracker;
    }

    public void shutdown(JobStatus jobStatus) throws Exception {
        synchronized (this.lock) {
            if (!this.shutdown) {
                this.shutdown = true;
                LOG.info("Stopping checkpoint coordinator for job {}.", this.job);
                this.periodicScheduling = false;
                this.triggerRequestQueued = false;
                MasterHooks.close(this.masterHooks.values(), LOG);
                this.masterHooks.clear();
                this.timer.shutdownNow();
                Iterator<PendingCheckpoint> it = this.pendingCheckpoints.values().iterator();
                while (it.hasNext()) {
                    it.next().abortError(new Exception("Checkpoint Coordinator is shutting down"));
                }
                this.pendingCheckpoints.clear();
                this.completedCheckpointStore.shutdown(jobStatus);
                this.checkpointIdCounter.shutdown(jobStatus);
            }
        }
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public CompletableFuture<CompletedCheckpoint> triggerSavepoint(long j, @Nullable String str) {
        CheckpointTriggerResult triggerCheckpoint = triggerCheckpoint(j, CheckpointProperties.forSavepoint(), str, false);
        return triggerCheckpoint.isSuccess() ? triggerCheckpoint.getPendingCheckpoint().getCompletionFuture() : FutureUtils.completedExceptionally(new CheckpointTriggerException("Failed to trigger savepoint.", triggerCheckpoint.getFailureReason()));
    }

    public boolean triggerCheckpoint(long j, boolean z) {
        return triggerCheckpoint(j, this.checkpointProperties, null, z).isSuccess();
    }

    @VisibleForTesting
    public CheckpointTriggerResult triggerCheckpoint(long j, CheckpointProperties checkpointProperties, @Nullable String str, boolean z) {
        synchronized (this.lock) {
            if (this.shutdown) {
                return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
            }
            if (z && !this.periodicScheduling) {
                return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
            }
            if (!checkpointProperties.forceCheckpoint()) {
                if (this.triggerRequestQueued) {
                    LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", this.job);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                }
                if (this.pendingCheckpoints.size() >= this.maxConcurrentCheckpointAttempts) {
                    this.triggerRequestQueued = true;
                    if (this.currentPeriodicTrigger != null) {
                        this.currentPeriodicTrigger.cancel(false);
                        this.currentPeriodicTrigger = null;
                    }
                    return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                }
                long nanoTime = ((this.lastCheckpointCompletionNanos + this.minPauseBetweenCheckpointsNanos) - System.nanoTime()) / 1000000;
                if (nanoTime > 0) {
                    if (this.currentPeriodicTrigger != null) {
                        this.currentPeriodicTrigger.cancel(false);
                        this.currentPeriodicTrigger = null;
                    }
                    this.currentPeriodicTrigger = this.timer.scheduleAtFixedRate(new ScheduledTrigger(), nanoTime, this.baseInterval, TimeUnit.MILLISECONDS);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                }
            }
            Execution[] executionArr = new Execution[this.tasksToTrigger.length];
            for (int i = 0; i < this.tasksToTrigger.length; i++) {
                Execution currentExecutionAttempt = this.tasksToTrigger[i].getCurrentExecutionAttempt();
                if (currentExecutionAttempt == null) {
                    LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.", this.tasksToTrigger[i].getTaskNameWithSubtaskIndex(), this.job);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                }
                if (currentExecutionAttempt.getState() != ExecutionState.RUNNING) {
                    LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", new Object[]{this.tasksToTrigger[i].getTaskNameWithSubtaskIndex(), this.job, ExecutionState.RUNNING, currentExecutionAttempt.getState()});
                    return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                }
                executionArr[i] = currentExecutionAttempt;
            }
            HashMap hashMap = new HashMap(this.tasksToWaitFor.length);
            for (ExecutionVertex executionVertex : this.tasksToWaitFor) {
                Execution currentExecutionAttempt2 = executionVertex.getCurrentExecutionAttempt();
                if (currentExecutionAttempt2 == null) {
                    LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.", executionVertex.getTaskNameWithSubtaskIndex(), this.job);
                    return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
                }
                hashMap.put(currentExecutionAttempt2.getAttemptId(), executionVertex);
            }
            synchronized (this.triggerLock) {
                try {
                    long andIncrement = this.checkpointIdCounter.getAndIncrement();
                    CheckpointStorageLocation initializeLocationForSavepoint = checkpointProperties.isSavepoint() ? this.checkpointStorage.initializeLocationForSavepoint(andIncrement, str) : this.checkpointStorage.initializeLocationForCheckpoint(andIncrement);
                    PendingCheckpoint pendingCheckpoint = new PendingCheckpoint(this.job, andIncrement, j, hashMap, checkpointProperties, initializeLocationForSavepoint, this.executor);
                    if (this.statsTracker != null) {
                        pendingCheckpoint.setStatsCallback(this.statsTracker.reportPendingCheckpoint(andIncrement, j, checkpointProperties));
                    }
                    Runnable runnable = () -> {
                        synchronized (this.lock) {
                            if (!pendingCheckpoint.isDiscarded()) {
                                LOG.info("Checkpoint {} of job {} expired before completing.", Long.valueOf(andIncrement), this.job);
                                pendingCheckpoint.abortExpired();
                                this.pendingCheckpoints.remove(Long.valueOf(andIncrement));
                                rememberRecentCheckpointId(andIncrement);
                                triggerQueuedRequests();
                            }
                        }
                    };
                    try {
                        synchronized (this.lock) {
                            if (this.shutdown) {
                                return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                            }
                            if (!checkpointProperties.forceCheckpoint()) {
                                if (this.triggerRequestQueued) {
                                    LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", this.job);
                                    return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
                                }
                                if (this.pendingCheckpoints.size() >= this.maxConcurrentCheckpointAttempts) {
                                    this.triggerRequestQueued = true;
                                    if (this.currentPeriodicTrigger != null) {
                                        this.currentPeriodicTrigger.cancel(false);
                                        this.currentPeriodicTrigger = null;
                                    }
                                    return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
                                }
                                long nanoTime2 = ((this.lastCheckpointCompletionNanos + this.minPauseBetweenCheckpointsNanos) - System.nanoTime()) / 1000000;
                                if (nanoTime2 > 0) {
                                    if (this.currentPeriodicTrigger != null) {
                                        this.currentPeriodicTrigger.cancel(false);
                                        this.currentPeriodicTrigger = null;
                                    }
                                    this.currentPeriodicTrigger = this.timer.scheduleAtFixedRate(new ScheduledTrigger(), nanoTime2, this.baseInterval, TimeUnit.MILLISECONDS);
                                    return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
                                }
                            }
                            LOG.info("Triggering checkpoint {} @ {} for job {}.", new Object[]{Long.valueOf(andIncrement), Long.valueOf(j), this.job});
                            this.pendingCheckpoints.put(Long.valueOf(andIncrement), pendingCheckpoint);
                            ScheduledFuture<?> schedule = this.timer.schedule(runnable, this.checkpointTimeout, TimeUnit.MILLISECONDS);
                            if (!pendingCheckpoint.setCancellerHandle(schedule)) {
                                schedule.cancel(false);
                            }
                            Iterator<MasterState> it = MasterHooks.triggerMasterHooks(this.masterHooks.values(), andIncrement, j, this.executor, Time.milliseconds(this.checkpointTimeout)).iterator();
                            while (it.hasNext()) {
                                pendingCheckpoint.addMasterState(it.next());
                            }
                            CheckpointOptions checkpointOptions = new CheckpointOptions(checkpointProperties.getCheckpointType(), initializeLocationForSavepoint.getLocationReference());
                            for (Execution execution : executionArr) {
                                execution.triggerCheckpoint(andIncrement, j, checkpointOptions);
                            }
                            this.numUnsuccessfulCheckpointsTriggers.set(0);
                            return new CheckpointTriggerResult(pendingCheckpoint);
                        }
                    } catch (Throwable th) {
                        synchronized (this.lock) {
                            this.pendingCheckpoints.remove(Long.valueOf(andIncrement));
                            LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)", new Object[]{Long.valueOf(andIncrement), this.job, Integer.valueOf(this.numUnsuccessfulCheckpointsTriggers.incrementAndGet()), th});
                            if (!pendingCheckpoint.isDiscarded()) {
                                pendingCheckpoint.abortError(new Exception("Failed to trigger checkpoint", th));
                            }
                            try {
                                initializeLocationForSavepoint.disposeOnFailure();
                            } catch (Throwable th2) {
                                LOG.warn("Cannot dispose failed checkpoint storage location {}", initializeLocationForSavepoint, th2);
                            }
                            return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                        }
                    }
                } catch (Throwable th3) {
                    LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).", new Object[]{this.job, Integer.valueOf(this.numUnsuccessfulCheckpointsTriggers.incrementAndGet()), th3});
                    return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                }
            }
        }
    }

    public void receiveDeclineMessage(DeclineCheckpoint declineCheckpoint) {
        if (this.shutdown || declineCheckpoint == null) {
            return;
        }
        if (!this.job.equals(declineCheckpoint.getJob())) {
            throw new IllegalArgumentException("Received DeclineCheckpoint message for job " + declineCheckpoint.getJob() + " while this coordinator handles job " + this.job);
        }
        long checkpointId = declineCheckpoint.getCheckpointId();
        String message = declineCheckpoint.getReason() != null ? declineCheckpoint.getReason().getMessage() : "";
        synchronized (this.lock) {
            if (this.shutdown) {
                return;
            }
            PendingCheckpoint remove = this.pendingCheckpoints.remove(Long.valueOf(checkpointId));
            if (remove != null && !remove.isDiscarded()) {
                LOG.info("Decline checkpoint {} by task {} of job {}.", new Object[]{Long.valueOf(checkpointId), declineCheckpoint.getTaskExecutionId(), this.job});
                discardCheckpoint(remove, declineCheckpoint.getReason());
            } else {
                if (remove != null) {
                    throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
                }
                if (LOG.isDebugEnabled()) {
                    if (this.recentPendingCheckpoints.contains(Long.valueOf(checkpointId))) {
                        LOG.debug("Received another decline message for now expired checkpoint attempt {} of job {} : {}", new Object[]{Long.valueOf(checkpointId), this.job, message});
                    } else {
                        LOG.debug("Received decline message for unknown (too old?) checkpoint attempt {} of job {} : {}", new Object[]{Long.valueOf(checkpointId), this.job, message});
                    }
                }
            }
        }
    }

    public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint acknowledgeCheckpoint) throws CheckpointException {
        boolean z;
        if (this.shutdown || acknowledgeCheckpoint == null) {
            return false;
        }
        if (!this.job.equals(acknowledgeCheckpoint.getJob())) {
            LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", this.job, acknowledgeCheckpoint);
            return false;
        }
        long checkpointId = acknowledgeCheckpoint.getCheckpointId();
        synchronized (this.lock) {
            if (this.shutdown) {
                return false;
            }
            PendingCheckpoint pendingCheckpoint = this.pendingCheckpoints.get(Long.valueOf(checkpointId));
            if (pendingCheckpoint == null || pendingCheckpoint.isDiscarded()) {
                if (pendingCheckpoint != null) {
                    throw new IllegalStateException("Received message for discarded but non-removed checkpoint " + checkpointId);
                }
                if (this.recentPendingCheckpoints.contains(Long.valueOf(checkpointId))) {
                    z = true;
                    LOG.warn("Received late message for now expired checkpoint attempt {} from {} of job {}.", new Object[]{Long.valueOf(checkpointId), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob()});
                } else {
                    LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.", new Object[]{Long.valueOf(checkpointId), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob()});
                    z = false;
                }
                discardSubtaskState(acknowledgeCheckpoint.getJob(), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getCheckpointId(), acknowledgeCheckpoint.getSubtaskState());
                return z;
            }
            switch (pendingCheckpoint.acknowledgeTask(acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getSubtaskState(), acknowledgeCheckpoint.getCheckpointMetrics())) {
                case SUCCESS:
                    LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.", new Object[]{Long.valueOf(checkpointId), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob()});
                    if (pendingCheckpoint.isFullyAcknowledged()) {
                        completePendingCheckpoint(pendingCheckpoint);
                        break;
                    }
                    break;
                case DUPLICATE:
                    LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.", new Object[]{Long.valueOf(acknowledgeCheckpoint.getCheckpointId()), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob()});
                    break;
                case UNKNOWN:
                    LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, because the task's execution attempt id was unknown. Discarding the state handle to avoid lingering state.", new Object[]{Long.valueOf(acknowledgeCheckpoint.getCheckpointId()), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob()});
                    discardSubtaskState(acknowledgeCheckpoint.getJob(), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getCheckpointId(), acknowledgeCheckpoint.getSubtaskState());
                    break;
                case DISCARDED:
                    LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, because the pending checkpoint had been discarded. Discarding the state handle tp avoid lingering state.", new Object[]{Long.valueOf(acknowledgeCheckpoint.getCheckpointId()), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getJob()});
                    discardSubtaskState(acknowledgeCheckpoint.getJob(), acknowledgeCheckpoint.getTaskExecutionId(), acknowledgeCheckpoint.getCheckpointId(), acknowledgeCheckpoint.getSubtaskState());
                    break;
            }
            return true;
        }
    }

    private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {
        long checkpointId = pendingCheckpoint.getCheckpointId();
        this.sharedStateRegistry.registerAll(pendingCheckpoint.getOperatorStates().values());
        try {
            try {
                final CompletedCheckpoint finalizeCheckpoint = pendingCheckpoint.finalizeCheckpoint();
                Preconditions.checkState(pendingCheckpoint.isDiscarded() && finalizeCheckpoint != null);
                try {
                    this.completedCheckpointStore.addCheckpoint(finalizeCheckpoint);
                    rememberRecentCheckpointId(checkpointId);
                    dropSubsumedCheckpoints(checkpointId);
                    this.lastCheckpointCompletionNanos = System.nanoTime();
                    LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", new Object[]{Long.valueOf(checkpointId), this.job, Long.valueOf(finalizeCheckpoint.getStateSize()), Long.valueOf(finalizeCheckpoint.getDuration())});
                    if (LOG.isDebugEnabled()) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("Checkpoint state: ");
                        Iterator<OperatorState> it = finalizeCheckpoint.getOperatorStates().values().iterator();
                        while (it.hasNext()) {
                            sb.append(it.next());
                            sb.append(", ");
                        }
                        sb.setLength(sb.length() - 2);
                        LOG.debug(sb.toString());
                    }
                    long timestamp = finalizeCheckpoint.getTimestamp();
                    for (ExecutionVertex executionVertex : this.tasksToCommitTo) {
                        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
                        if (currentExecutionAttempt != null) {
                            currentExecutionAttempt.notifyCheckpointComplete(checkpointId, timestamp);
                        }
                    }
                } catch (Exception e) {
                    this.executor.execute(new Runnable() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinator.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                finalizeCheckpoint.discardOnFailedStoring();
                            } catch (Throwable th) {
                                CheckpointCoordinator.LOG.warn("Could not properly discard completed checkpoint {}.", Long.valueOf(finalizeCheckpoint.getCheckpointID()), th);
                            }
                        }
                    });
                    throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', e);
                }
            } catch (Exception e2) {
                if (!pendingCheckpoint.isDiscarded()) {
                    pendingCheckpoint.abortError(e2);
                }
                throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e2);
            }
        } finally {
            this.pendingCheckpoints.remove(Long.valueOf(checkpointId));
            triggerQueuedRequests();
        }
    }

    public void failUnacknowledgedPendingCheckpointsFor(ExecutionAttemptID executionAttemptID, Throwable th) {
        synchronized (this.lock) {
            Iterator<PendingCheckpoint> it = this.pendingCheckpoints.values().iterator();
            while (it.hasNext()) {
                PendingCheckpoint next = it.next();
                if (!next.isAcknowledgedBy(executionAttemptID)) {
                    it.remove();
                    discardCheckpoint(next, th);
                }
            }
        }
    }

    private void rememberRecentCheckpointId(long j) {
        if (this.recentPendingCheckpoints.size() >= 16) {
            this.recentPendingCheckpoints.removeFirst();
        }
        this.recentPendingCheckpoints.addLast(Long.valueOf(j));
    }

    private void dropSubsumedCheckpoints(long j) {
        Iterator<Map.Entry<Long, PendingCheckpoint>> it = this.pendingCheckpoints.entrySet().iterator();
        while (it.hasNext()) {
            PendingCheckpoint value = it.next().getValue();
            if (value.getCheckpointId() < j && value.canBeSubsumed()) {
                rememberRecentCheckpointId(value.getCheckpointId());
                value.abortSubsumed();
                it.remove();
            }
        }
    }

    private void triggerQueuedRequests() {
        if (this.triggerRequestQueued) {
            this.triggerRequestQueued = false;
            if (!this.periodicScheduling) {
                this.timer.execute(new ScheduledTrigger());
                return;
            }
            if (this.currentPeriodicTrigger != null) {
                this.currentPeriodicTrigger.cancel(false);
            }
            this.currentPeriodicTrigger = this.timer.scheduleAtFixedRate(new ScheduledTrigger(), 0L, this.baseInterval, TimeUnit.MILLISECONDS);
        }
    }

    @VisibleForTesting
    int getNumScheduledTasks() {
        return this.timer.getQueue().size();
    }

    public boolean restoreLatestCheckpointedState(Map<JobVertexID, ExecutionJobVertex> map, boolean z, boolean z2) throws Exception {
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalStateException("CheckpointCoordinator is shut down");
            }
            this.sharedStateRegistry.close();
            this.sharedStateRegistry = this.sharedStateRegistryFactory.create(this.executor);
            this.completedCheckpointStore.recover();
            Iterator<CompletedCheckpoint> it = this.completedCheckpointStore.getAllCheckpoints().iterator();
            while (it.hasNext()) {
                it.next().registerSharedStatesAfterRestored(this.sharedStateRegistry);
            }
            LOG.debug("Status of the shared state registry of job {} after restore: {}.", this.job, this.sharedStateRegistry);
            CompletedCheckpoint latestCheckpoint = this.completedCheckpointStore.getLatestCheckpoint();
            if (latestCheckpoint == null) {
                if (z) {
                    throw new IllegalStateException("No completed checkpoint available");
                }
                LOG.debug("Resetting the master hooks.");
                MasterHooks.reset(this.masterHooks.values(), LOG);
                return false;
            }
            LOG.info("Restoring job {} from latest valid checkpoint: {}.", this.job, latestCheckpoint);
            new StateAssignmentOperation(latestCheckpoint.getCheckpointID(), map, latestCheckpoint.getOperatorStates(), z2).assignStates();
            MasterHooks.restoreMasterHooks(this.masterHooks, latestCheckpoint.getMasterHookStates(), latestCheckpoint.getCheckpointID(), z2, LOG);
            if (this.statsTracker != null) {
                this.statsTracker.reportRestoredCheckpoint(new RestoredCheckpointStats(latestCheckpoint.getCheckpointID(), latestCheckpoint.getProperties(), System.currentTimeMillis(), latestCheckpoint.getExternalPointer()));
            }
            return true;
        }
    }

    public boolean restoreSavepoint(String str, boolean z, Map<JobVertexID, ExecutionJobVertex> map, ClassLoader classLoader) throws Exception {
        Preconditions.checkNotNull(str, "The savepoint path cannot be null.");
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = this.job;
        objArr[1] = str;
        objArr[2] = z ? "allowing non restored state" : "";
        logger.info("Starting job {} from savepoint {} ({})", objArr);
        CompletedCheckpoint loadAndValidateCheckpoint = Checkpoints.loadAndValidateCheckpoint(this.job, map, this.checkpointStorage.resolveCheckpoint(str), classLoader, z);
        this.completedCheckpointStore.addCheckpoint(loadAndValidateCheckpoint);
        long checkpointID = loadAndValidateCheckpoint.getCheckpointID() + 1;
        this.checkpointIdCounter.setCount(checkpointID);
        LOG.info("Reset the checkpoint ID of job {} to {}.", this.job, Long.valueOf(checkpointID));
        return restoreLatestCheckpointedState(map, true, z);
    }

    public int getNumberOfPendingCheckpoints() {
        return this.pendingCheckpoints.size();
    }

    public int getNumberOfRetainedSuccessfulCheckpoints() {
        int numberOfRetainedCheckpoints;
        synchronized (this.lock) {
            numberOfRetainedCheckpoints = this.completedCheckpointStore.getNumberOfRetainedCheckpoints();
        }
        return numberOfRetainedCheckpoints;
    }

    public Map<Long, PendingCheckpoint> getPendingCheckpoints() {
        HashMap hashMap;
        synchronized (this.lock) {
            hashMap = new HashMap(this.pendingCheckpoints);
        }
        return hashMap;
    }

    public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
        List<CompletedCheckpoint> allCheckpoints;
        synchronized (this.lock) {
            allCheckpoints = this.completedCheckpointStore.getAllCheckpoints();
        }
        return allCheckpoints;
    }

    public CheckpointStorage getCheckpointStorage() {
        return this.checkpointStorage;
    }

    public CompletedCheckpointStore getCheckpointStore() {
        return this.completedCheckpointStore;
    }

    public CheckpointIDCounter getCheckpointIdCounter() {
        return this.checkpointIdCounter;
    }

    public long getCheckpointTimeout() {
        return this.checkpointTimeout;
    }

    public boolean isPeriodicCheckpointingConfigured() {
        return this.baseInterval != Long.MAX_VALUE;
    }

    public void startCheckpointScheduler() {
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            stopCheckpointScheduler();
            this.periodicScheduling = true;
            this.currentPeriodicTrigger = this.timer.scheduleAtFixedRate(new ScheduledTrigger(), ThreadLocalRandom.current().nextLong(this.minPauseBetweenCheckpointsNanos / 1000000, this.baseInterval + 1), this.baseInterval, TimeUnit.MILLISECONDS);
        }
    }

    public void stopCheckpointScheduler() {
        synchronized (this.lock) {
            this.triggerRequestQueued = false;
            this.periodicScheduling = false;
            if (this.currentPeriodicTrigger != null) {
                this.currentPeriodicTrigger.cancel(false);
                this.currentPeriodicTrigger = null;
            }
            Iterator<PendingCheckpoint> it = this.pendingCheckpoints.values().iterator();
            while (it.hasNext()) {
                it.next().abortError(new Exception("Checkpoint Coordinator is suspending."));
            }
            this.pendingCheckpoints.clear();
            this.numUnsuccessfulCheckpointsTriggers.set(0);
        }
    }

    public JobStatusListener createActivatorDeactivator() {
        JobStatusListener jobStatusListener;
        synchronized (this.lock) {
            if (this.shutdown) {
                throw new IllegalArgumentException("Checkpoint coordinator is shut down");
            }
            if (this.jobStatusListener == null) {
                this.jobStatusListener = new CheckpointCoordinatorDeActivator(this);
            }
            jobStatusListener = this.jobStatusListener;
        }
        return jobStatusListener;
    }

    private void discardCheckpoint(PendingCheckpoint pendingCheckpoint, @Nullable Throwable th) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.lock)) {
            throw new AssertionError();
        }
        Preconditions.checkNotNull(pendingCheckpoint);
        long checkpointId = pendingCheckpoint.getCheckpointId();
        LOG.info("Discarding checkpoint {} of job {} because: {}", new Object[]{Long.valueOf(checkpointId), this.job, th != null ? th.getMessage() : ""});
        pendingCheckpoint.abortDeclined();
        rememberRecentCheckpointId(checkpointId);
        boolean z = false;
        Iterator<PendingCheckpoint> it = this.pendingCheckpoints.values().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            PendingCheckpoint next = it.next();
            if (!next.isDiscarded() && next.getCheckpointId() >= pendingCheckpoint.getCheckpointId()) {
                z = true;
                break;
            }
        }
        if (z) {
            return;
        }
        triggerQueuedRequests();
    }

    private void discardSubtaskState(final JobID jobID, final ExecutionAttemptID executionAttemptID, final long j, final TaskStateSnapshot taskStateSnapshot) {
        if (taskStateSnapshot != null) {
            this.executor.execute(new Runnable() { // from class: org.apache.flink.runtime.checkpoint.CheckpointCoordinator.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        taskStateSnapshot.discardState();
                    } catch (Throwable th) {
                        CheckpointCoordinator.LOG.warn("Could not properly discard state object of checkpoint {} belonging to task {} of job {}.", new Object[]{Long.valueOf(j), executionAttemptID, jobID, th});
                    }
                }
            });
        }
    }

    static {
        $assertionsDisabled = !CheckpointCoordinator.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
    }
}
