package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.Messages;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.FluxProcessor;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

/* loaded from: input_file:com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessor.class */
public class AmqpReceiveLinkProcessor extends FluxProcessor<AmqpReceiveLink, Message> implements Subscription {
    private static final int MINIMUM_REQUEST = 0;
    private static final int MAXIMUM_REQUEST = 100;
    private final ClientLogger logger = new ClientLogger(AmqpReceiveLinkProcessor.class);
    private final Object lock = new Object();
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final AtomicBoolean hasDownstream = new AtomicBoolean();
    private final AtomicInteger retryAttempts = new AtomicInteger();
    private final AtomicBoolean isRequested = new AtomicBoolean();
    private final AtomicInteger linkCreditRequest = new AtomicInteger(1);
    private final int prefetch;
    private final AmqpRetryPolicy retryPolicy;
    private Disposable parentConnection;
    private volatile Subscription upstream;
    private volatile CoreSubscriber<? super Message> downstream;
    private volatile Throwable lastError;
    private volatile AmqpReceiveLink currentLink;
    private volatile Disposable currentLinkSubscriptions;
    private volatile Disposable retrySubscription;

    public AmqpReceiveLinkProcessor(int i, AmqpRetryPolicy amqpRetryPolicy, Disposable disposable) {
        this.retryPolicy = (AmqpRetryPolicy) Objects.requireNonNull(amqpRetryPolicy, "'retryPolicy' cannot be null.");
        this.parentConnection = (Disposable) Objects.requireNonNull(disposable, "'parentConnection' cannot be null.");
        if (i < 0) {
            throw this.logger.logExceptionAsError(new IllegalArgumentException("'prefetch' cannot be less than 0."));
        }
        this.prefetch = i;
    }

    public Throwable getError() {
        return this.lastError;
    }

    public boolean isTerminated() {
        return this.isTerminated.get();
    }

    public void onSubscribe(Subscription subscription) {
        Objects.requireNonNull(subscription, "'subscription' cannot be null");
        if (isTerminated()) {
            return;
        }
        this.logger.verbose("Subscribing to upstream.", new Object[MINIMUM_REQUEST]);
        this.upstream = subscription;
        subscription.request(0L);
    }

    public void onNext(AmqpReceiveLink amqpReceiveLink) {
        AmqpReceiveLink amqpReceiveLink2;
        Disposable disposable;
        Objects.requireNonNull(amqpReceiveLink, "'next' cannot be null.");
        if (isTerminated()) {
            this.logger.warning("Got another link when we have already terminated processor. Link: {}", new Object[]{amqpReceiveLink.getEntityPath()});
            return;
        }
        this.logger.info("Setting next AMQP receive link.", new Object[MINIMUM_REQUEST]);
        synchronized (this.lock) {
            amqpReceiveLink2 = this.currentLink;
            disposable = this.currentLinkSubscriptions;
            this.currentLink = amqpReceiveLink;
            amqpReceiveLink.addCredits(this.prefetch);
            amqpReceiveLink.setEmptyCreditListener(() -> {
                if (this.hasDownstream.get()) {
                    return Integer.valueOf(this.linkCreditRequest.get());
                }
                this.logger.verbose("Emitter has no downstream subscribers. Not adding credits.", new Object[MINIMUM_REQUEST]);
                return Integer.valueOf(MINIMUM_REQUEST);
            });
            this.currentLinkSubscriptions = Disposables.composite(new Disposable[]{amqpReceiveLink.getEndpointStates().subscribe(amqpEndpointState -> {
                if (amqpEndpointState == AmqpEndpointState.ACTIVE) {
                    this.retryAttempts.set(MINIMUM_REQUEST);
                }
            }, th -> {
                this.currentLink = null;
                onError(th);
            }, () -> {
                if (this.parentConnection.isDisposed()) {
                    this.logger.info("Parent connection is disposed.", new Object[MINIMUM_REQUEST]);
                    return;
                }
                if (isTerminated()) {
                    this.logger.info("Processor is disposed.", new Object[MINIMUM_REQUEST]);
                    return;
                }
                this.logger.info("Receive link endpoint states are closed.", new Object[MINIMUM_REQUEST]);
                AmqpReceiveLink amqpReceiveLink3 = this.currentLink;
                this.currentLink = null;
                if (amqpReceiveLink3 != null) {
                    amqpReceiveLink3.dispose();
                }
                requestUpstream();
            }), amqpReceiveLink.receive().subscribe(message -> {
                this.logger.verbose("Pushing next message downstream.", new Object[MINIMUM_REQUEST]);
                this.downstream.onNext(message);
            })});
        }
        if (amqpReceiveLink2 != null) {
            amqpReceiveLink2.dispose();
        }
        if (disposable != null) {
            disposable.dispose();
        }
        this.isRequested.set(false);
    }

    public void subscribe(CoreSubscriber<? super Message> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'actual' cannot be null.");
        if (isTerminated()) {
            this.logger.info("AmqpReceiveLink is already terminated.", new Object[MINIMUM_REQUEST]);
            coreSubscriber.onSubscribe(Operators.emptySubscription());
            if (hasError()) {
                coreSubscriber.onError(this.lastError);
                return;
            } else {
                coreSubscriber.onComplete();
                return;
            }
        }
        if (this.hasDownstream.getAndSet(true)) {
            Operators.error(coreSubscriber, this.logger.logExceptionAsError(new IllegalStateException("There is already one downstream subscriber.'")));
            return;
        }
        this.downstream = coreSubscriber;
        coreSubscriber.onSubscribe(this);
        requestUpstream();
    }

    public void onError(Throwable th) {
        Objects.requireNonNull(th, "'throwable' is required.");
        if (isTerminated()) {
            this.logger.info("AmqpReceiveLinkProcessor is terminated. Not reopening on error.", new Object[MINIMUM_REQUEST]);
            return;
        }
        int incrementAndGet = this.retryAttempts.incrementAndGet();
        Duration calculateRetryDelay = this.retryPolicy.calculateRetryDelay(th, incrementAndGet);
        if (calculateRetryDelay != null && !this.parentConnection.isDisposed()) {
            this.logger.warning("Transient error occurred. Attempt: {}. Retrying after {} ms.", new Object[]{Integer.valueOf(incrementAndGet), Long.valueOf(calculateRetryDelay.toMillis()), th});
            this.retrySubscription = Mono.delay(calculateRetryDelay).subscribe(l -> {
                requestUpstream();
            });
            return;
        }
        if (this.parentConnection.isDisposed()) {
            this.logger.info("Parent connection is disposed. Not reopening on error.", new Object[MINIMUM_REQUEST]);
        }
        this.logger.warning("Non-retryable error occurred in AMQP receive link.", new Object[]{th});
        this.lastError = th;
        this.isTerminated.set(true);
        synchronized (this.lock) {
            if (this.downstream != null) {
                this.downstream.onError(th);
            }
        }
        terminate();
    }

    public void onComplete() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        if (this.hasDownstream.get()) {
            this.downstream.onComplete();
        }
        terminate();
    }

    public void request(long j) {
        if (this.isTerminated.get()) {
            this.logger.info("Cannot request more from AMQP link processor that is disposed.", new Object[MINIMUM_REQUEST]);
        } else {
            if (j < 0) {
                this.logger.warning(Messages.REQUEST_VALUE_NOT_VALID, new Object[]{Integer.valueOf(MINIMUM_REQUEST), Integer.valueOf(MAXIMUM_REQUEST)});
                return;
            }
            int i = j > 100 ? MAXIMUM_REQUEST : (int) j;
            this.logger.verbose("Back pressure request. Old value: {}. New value: {}", new Object[]{Integer.valueOf(this.linkCreditRequest.get()), Integer.valueOf(i)});
            this.linkCreditRequest.set(i);
        }
    }

    public void cancel() {
        if (this.isTerminated.getAndSet(true)) {
            return;
        }
        if (this.hasDownstream.get()) {
            this.downstream.onComplete();
        }
        terminate();
    }

    private void requestUpstream() {
        if (isTerminated()) {
            this.logger.verbose("Terminated. Not requesting another.", new Object[MINIMUM_REQUEST]);
            return;
        }
        synchronized (this.lock) {
            if (this.currentLink != null) {
                this.logger.info("AmqpReceiveLink exists, not requesting another.", new Object[MINIMUM_REQUEST]);
                return;
            }
            if (this.upstream == null) {
                this.logger.verbose("There is no upstream. Not requesting", new Object[MINIMUM_REQUEST]);
            } else if (this.isRequested.getAndSet(true)) {
                this.logger.info("AmqpRecieveLink already requested.", new Object[MINIMUM_REQUEST]);
            } else {
                this.logger.info("AmqpReceiveLink not requested, yet. Requesting one.", new Object[MINIMUM_REQUEST]);
                this.upstream.request(1L);
            }
        }
    }

    private void terminate() {
        if (this.retrySubscription != null && !this.retrySubscription.isDisposed()) {
            this.retrySubscription.dispose();
        }
        if (this.currentLink != null) {
            this.currentLink.dispose();
        }
        this.currentLink = null;
        if (this.currentLinkSubscriptions != null) {
            this.currentLinkSubscriptions.dispose();
        }
    }
}
