package org.apache.flink.runtime.concurrent;

import akka.dispatch.OnComplete;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.Preconditions;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils.class */
public class FutureUtils {

    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$ConjunctFuture.class */
    public static abstract class ConjunctFuture<T> extends CompletableFuture<T> {
        public abstract int getNumFuturesTotal();

        public abstract int getNumFuturesCompleted();
    }

    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$Delayer.class */
    private static final class Delayer {
        static final ScheduledThreadPoolExecutor delayer = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("FlinkCompletableFutureDelayScheduler"));

        private Delayer() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static ScheduledFuture<?> delay(Runnable runnable, long j, TimeUnit timeUnit) {
            Preconditions.checkNotNull(runnable);
            Preconditions.checkNotNull(timeUnit);
            return delayer.schedule(runnable, j, timeUnit);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$ResultConjunctFuture.class */
    public static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> {
        private final int numTotal;
        private final AtomicInteger nextIndex = new AtomicInteger(0);
        private final AtomicInteger numCompleted = new AtomicInteger(0);
        private volatile T[] results;

        final void handleCompletedFuture(T t, Throwable th) {
            if (th != null) {
                completeExceptionally(th);
                return;
            }
            this.results[this.nextIndex.getAndIncrement()] = t;
            if (this.numCompleted.incrementAndGet() == this.numTotal) {
                complete(Arrays.asList(this.results));
            }
        }

        ResultConjunctFuture(int i) {
            this.numTotal = i;
            this.results = (T[]) new Object[i];
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesTotal() {
            return this.numTotal;
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesCompleted() {
            return this.numCompleted.get();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$RetryException.class */
    public static class RetryException extends Exception {
        private static final long serialVersionUID = 3613470781274141862L;

        public RetryException(String str) {
            super(str);
        }

        public RetryException(String str, Throwable th) {
            super(str, th);
        }

        public RetryException(Throwable th) {
            super(th);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$Timeout.class */
    private static final class Timeout implements Runnable {
        private final CompletableFuture<?> future;

        private Timeout(CompletableFuture<?> completableFuture) {
            this.future = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        }

        @Override // java.lang.Runnable
        public void run() {
            this.future.completeExceptionally(new TimeoutException());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/concurrent/FutureUtils$WaitingConjunctFuture.class */
    public static final class WaitingConjunctFuture extends ConjunctFuture<Void> {
        private final AtomicInteger numCompleted;
        private final int numTotal;

        private void handleCompletedFuture(Object obj, Throwable th) {
            if (th != null) {
                completeExceptionally(th);
            } else if (this.numTotal == this.numCompleted.incrementAndGet()) {
                complete(null);
            }
        }

        private WaitingConjunctFuture(Collection<? extends CompletableFuture<?>> collection) {
            this.numCompleted = new AtomicInteger(0);
            Preconditions.checkNotNull(collection, "Futures must not be null.");
            this.numTotal = collection.size();
            if (collection.isEmpty()) {
                complete(null);
                return;
            }
            Iterator<? extends CompletableFuture<?>> it = collection.iterator();
            while (it.hasNext()) {
                it.next().whenComplete(this::handleCompletedFuture);
            }
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesTotal() {
            return this.numTotal;
        }

        @Override // org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture
        public int getNumFuturesCompleted() {
            return this.numCompleted.get();
        }
    }

    public static <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> supplier, int i, Executor executor) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        retryOperation(completableFuture, supplier, i, executor);
        return completableFuture;
    }

    private static <T> void retryOperation(CompletableFuture<T> completableFuture, Supplier<CompletableFuture<T>> supplier, int i, Executor executor) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture<T> completableFuture2 = supplier.get();
        completableFuture2.whenCompleteAsync((BiConsumer) (obj, th) -> {
            if (th == null) {
                completableFuture.complete(obj);
                return;
            }
            if (th instanceof CancellationException) {
                completableFuture.completeExceptionally(new RetryException("Operation future was cancelled.", th));
            } else if (i > 0) {
                retryOperation(completableFuture, supplier, i - 1, executor);
            } else {
                completableFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries has been exhausted.", th));
            }
        }, executor);
        completableFuture.whenComplete((BiConsumer) (obj2, th2) -> {
            completableFuture2.cancel(false);
        });
    }

    public static <T> CompletableFuture<T> retryWithDelay(Supplier<CompletableFuture<T>> supplier, int i, Time time, ScheduledExecutor scheduledExecutor) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        retryOperationWithDelay(completableFuture, supplier, i, time, scheduledExecutor);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void retryOperationWithDelay(CompletableFuture<T> completableFuture, Supplier<CompletableFuture<T>> supplier, int i, Time time, ScheduledExecutor scheduledExecutor) {
        if (completableFuture.isDone()) {
            return;
        }
        CompletableFuture<T> completableFuture2 = supplier.get();
        completableFuture2.whenCompleteAsync((BiConsumer) (obj, th) -> {
            if (th == null) {
                completableFuture.complete(obj);
                return;
            }
            if (th instanceof CancellationException) {
                completableFuture.completeExceptionally(new RetryException("Operation future was cancelled.", th));
            } else if (i <= 0) {
                completableFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries has been exhausted.", th));
            } else {
                ScheduledFuture<?> schedule = scheduledExecutor.schedule(() -> {
                    retryOperationWithDelay(completableFuture, supplier, i - 1, time, scheduledExecutor);
                }, time.toMilliseconds(), TimeUnit.MILLISECONDS);
                completableFuture.whenComplete((obj, th) -> {
                    schedule.cancel(false);
                });
            }
        }, (Executor) scheduledExecutor);
        completableFuture.whenComplete((BiConsumer) (obj2, th2) -> {
            completableFuture2.cancel(false);
        });
    }

    public static <T> ConjunctFuture<Collection<T>> combineAll(Collection<? extends CompletableFuture<? extends T>> collection) {
        Preconditions.checkNotNull(collection, "futures");
        ResultConjunctFuture resultConjunctFuture = new ResultConjunctFuture(collection.size());
        if (collection.isEmpty()) {
            resultConjunctFuture.complete(Collections.emptyList());
        } else {
            for (CompletableFuture<? extends T> completableFuture : collection) {
                resultConjunctFuture.getClass();
                completableFuture.whenComplete(resultConjunctFuture::handleCompletedFuture);
            }
        }
        return resultConjunctFuture;
    }

    public static ConjunctFuture<Void> waitForAll(Collection<? extends CompletableFuture<?>> collection) {
        Preconditions.checkNotNull(collection, "futures");
        return new WaitingConjunctFuture(collection);
    }

    public static <T> CompletableFuture<T> completedExceptionally(Throwable th) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(th);
        return completableFuture;
    }

    public static FiniteDuration toFiniteDuration(Time time) {
        return new FiniteDuration(time.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    public static Time toTime(FiniteDuration finiteDuration) {
        return Time.milliseconds(finiteDuration.toMillis());
    }

    public static <T> CompletableFuture<T> toJava(Future<T> future) {
        final CompletableFuture<T> completableFuture = new CompletableFuture<>();
        future.onComplete(new OnComplete<T>() { // from class: org.apache.flink.runtime.concurrent.FutureUtils.1
            @Override // akka.dispatch.OnComplete
            public void onComplete(Throwable th, T t) throws Throwable {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    completableFuture.complete(t);
                }
            }
        }, Executors.directExecutionContext());
        return completableFuture;
    }

    public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> completableFuture, long j, TimeUnit timeUnit) {
        ScheduledFuture delay = Delayer.delay(new Timeout(completableFuture), j, timeUnit);
        completableFuture.whenComplete((BiConsumer) (obj, th) -> {
            if (delay.isDone()) {
                return;
            }
            delay.cancel(false);
        });
        return completableFuture;
    }
}
