package software.amazon.awssdk.transfer.s3.internal;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.DemandIgnoringSubscription;
import software.amazon.awssdk.utils.async.StoringSubscriber;

@SdkInternalApi
/* loaded from: input_file:software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.class */
public class AsyncBufferingSubscriber<T> implements Subscriber<T> {
    private static final Logger log = Logger.loggerFor((Class<?>) AsyncBufferingSubscriber.class);
    private final CompletableFuture<?> returnFuture;
    private final Function<T, CompletableFuture<?>> consumer;
    private final int maxConcurrentExecutions;
    private volatile boolean isStreamingDone;
    private Subscription subscription;
    private final AtomicBoolean isDelivering = new AtomicBoolean(false);
    private final AtomicInteger numRequestsInFlight = new AtomicInteger(0);
    private final StoringSubscriber<T> storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE);

    public AsyncBufferingSubscriber(Function<T, CompletableFuture<?>> function, CompletableFuture<Void> completableFuture, int i) {
        this.returnFuture = completableFuture;
        this.consumer = function;
        this.maxConcurrentExecutions = i;
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        Validate.paramNotNull(subscription, "subscription");
        if (this.subscription != null) {
            log.warn(() -> {
                return "The subscriber has already been subscribed. Cancelling the incoming subscription";
            });
            subscription.cancel();
        } else {
            this.storingSubscriber.onSubscribe(new DemandIgnoringSubscription(subscription));
            this.subscription = subscription;
            subscription.request(this.maxConcurrentExecutions);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(T t) {
        this.storingSubscriber.onNext(t);
        flushBufferIfNeeded();
    }

    private void flushBufferIfNeeded() {
        if (this.isDelivering.compareAndSet(false, true)) {
            try {
                Optional<StoringSubscriber.Event<T>> peek = this.storingSubscriber.peek();
                while (true) {
                    if (this.numRequestsInFlight.get() < this.maxConcurrentExecutions) {
                        if (peek.isPresent()) {
                            switch (peek.get().type()) {
                                case ON_COMPLETE:
                                    handleCompleteEvent();
                                    break;
                                case ON_ERROR:
                                    handleError(peek.get().runtimeError());
                                    break;
                                case ON_NEXT:
                                    handleOnNext(peek.get().value());
                                    break;
                                default:
                                    handleError(new IllegalStateException("Unknown stored type: " + peek.get().type()));
                                    break;
                            }
                            peek = this.storingSubscriber.peek();
                        } else {
                            this.subscription.request(1L);
                        }
                    }
                }
            } finally {
                this.isDelivering.set(false);
            }
        }
    }

    private void handleOnNext(T t) {
        this.storingSubscriber.poll();
        int incrementAndGet = this.numRequestsInFlight.incrementAndGet();
        log.debug(() -> {
            return "Delivering next item, numRequestInFlight=" + incrementAndGet;
        });
        this.consumer.apply(t).whenComplete((obj, th) -> {
            this.numRequestsInFlight.decrementAndGet();
            if (this.isStreamingDone) {
                flushBufferIfNeeded();
            } else {
                this.subscription.request(1L);
            }
        });
    }

    private void handleCompleteEvent() {
        if (this.numRequestsInFlight.get() == 0) {
            this.returnFuture.complete(null);
            this.storingSubscriber.poll();
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        handleError(th);
        this.storingSubscriber.onError(th);
    }

    private void handleError(Throwable th) {
        this.returnFuture.completeExceptionally(th);
        this.storingSubscriber.poll();
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        this.isStreamingDone = true;
        this.storingSubscriber.onComplete();
        flushBufferIfNeeded();
    }

    public int numRequestsInFlight() {
        return this.numRequestsInFlight.get();
    }
}
