package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

/* loaded from: input_file:com/azure/messaging/servicebus/SynchronousMessageSubscriber.class */
class SynchronousMessageSubscriber extends BaseSubscriber<ServiceBusReceivedMessage> {
    private volatile Throwable disposalReason;
    private final ServiceBusReceiverAsyncClient asyncClient;
    private final boolean isPrefetchDisabled;
    private final Duration operationTimeout;
    private final boolean isReceiveDeleteMode;
    private volatile SynchronousReceiveWork currentWork;
    private volatile long requested;
    private volatile Subscription upstream;
    private static final ClientLogger LOGGER = new ClientLogger(SynchronousMessageSubscriber.class);
    private static final RuntimeException CLIENT_TERMINATED_ERROR = new RuntimeException("The receiver client is terminated. Re-create the client to continue receive attempt.");
    private static final AtomicLongFieldUpdater<SynchronousMessageSubscriber> REQUESTED = AtomicLongFieldUpdater.newUpdater(SynchronousMessageSubscriber.class, ServiceBusConstants.NUMBER_OF_REQUESTED_MESSAGES_KEY);
    private static final AtomicReferenceFieldUpdater<SynchronousMessageSubscriber, Subscription> UPSTREAM = AtomicReferenceFieldUpdater.newUpdater(SynchronousMessageSubscriber.class, Subscription.class, "upstream");
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicInteger wip = new AtomicInteger();
    private final ConcurrentLinkedQueue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue<>();
    private final ConcurrentLinkedDeque<ServiceBusReceivedMessage> bufferMessages = new ConcurrentLinkedDeque<>();
    private final Object currentWorkLock = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SynchronousMessageSubscriber(ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient, SynchronousReceiveWork synchronousReceiveWork, boolean z, Duration duration) {
        this.asyncClient = (ServiceBusReceiverAsyncClient) Objects.requireNonNull(serviceBusReceiverAsyncClient, "'asyncClient' cannot be null.");
        this.operationTimeout = (Duration) Objects.requireNonNull(duration, "'operationTimeout' cannot be null.");
        this.workQueue.add((SynchronousReceiveWork) Objects.requireNonNull(synchronousReceiveWork, "'initialWork' cannot be null."));
        this.isPrefetchDisabled = z;
        this.isReceiveDeleteMode = serviceBusReceiverAsyncClient.getReceiverOptions().getReceiveMode() == ServiceBusReceiveMode.RECEIVE_AND_DELETE;
        if (synchronousReceiveWork.getNumberOfEvents() < 1) {
            throw LOGGER.logExceptionAsError(new IllegalArgumentException("'numberOfEvents' cannot be less than 1. Actual: " + synchronousReceiveWork.getNumberOfEvents()));
        }
        Operators.addCap(REQUESTED, this, synchronousReceiveWork.getNumberOfEvents());
    }

    protected void hookOnSubscribe(Subscription subscription) {
        if (!Operators.setOnce(UPSTREAM, this, subscription)) {
            LOGGER.warning("This should only be subscribed to once. Ignoring subscription.");
        } else {
            getOrUpdateCurrentWork();
            subscription.request(REQUESTED.get(this));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void hookOnNext(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        if (isTerminated()) {
            Operators.onNextDropped(serviceBusReceivedMessage, Context.empty());
        } else {
            this.bufferMessages.add(serviceBusReceivedMessage);
            drain();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void queueWork(SynchronousReceiveWork synchronousReceiveWork) {
        Objects.requireNonNull(synchronousReceiveWork, "'work' cannot be null");
        if (isTerminated()) {
            Throwable th = this.disposalReason;
            if (th == null) {
                th = CLIENT_TERMINATED_ERROR;
            }
            synchronousReceiveWork.complete("The receiver client is terminated. Re-create the client to continue receive attempt.", th);
            return;
        }
        this.workQueue.add(synchronousReceiveWork);
        LoggingEventBuilder addKeyValue = LOGGER.atVerbose().addKeyValue(ServiceBusConstants.WORK_ID_KEY, synchronousReceiveWork.getId()).addKeyValue("numberOfEvents", synchronousReceiveWork.getNumberOfEvents()).addKeyValue("timeout", synchronousReceiveWork.getTimeout());
        if (this.workQueue.peek() == synchronousReceiveWork && (this.currentWork == null || this.currentWork.isTerminal())) {
            addKeyValue.log("First work in queue. Requesting upstream if needed.");
            getOrUpdateCurrentWork();
        } else {
            addKeyValue.log("Queuing receive work.");
        }
        if (UPSTREAM.get(this) != null) {
            drain();
        }
    }

    private void drain() {
        if (this.wip.getAndIncrement() != 0) {
            return;
        }
        int i = 1;
        while (true) {
            int i2 = i;
            if (i2 == 0) {
                return;
            }
            try {
                drainQueue();
            } finally {
                this.wip.addAndGet(-i2);
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:34:0x008d, code lost:
    
        if (r13 != false) goto L61;
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x0094, code lost:
    
        if (r5.isPrefetchDisabled == false) goto L58;
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x0097, code lost:
    
        r5.asyncClient.release(r0).subscribe((v0) -> { // java.util.function.Consumer.accept(java.lang.Object):void
            lambda$drainQueue$0(v0);
        }, (v1) -> { // java.util.function.Consumer.accept(java.lang.Object):void
            lambda$drainQueue$1(r2, v1);
        }, () -> { // java.lang.Runnable.run():void
            lambda$drainQueue$2(r3);
        });
     */
    /* JADX WARN: Code restructure failed: missing block: B:41:0x00ba, code lost:
    
        r5.bufferMessages.addFirst(r0);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void drainQueue() {
        /*
            Method dump skipped, instructions count: 271
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.azure.messaging.servicebus.SynchronousMessageSubscriber.drainQueue():void");
    }

    protected void hookOnError(Throwable th) {
        dispose("Errors occurred upstream", th);
    }

    protected void hookOnCancel() {
        dispose();
    }

    private boolean isTerminated() {
        if (UPSTREAM.get(this) == Operators.cancelledSubscription()) {
            return true;
        }
        return this.isDisposed.get();
    }

    private SynchronousReceiveWork getOrUpdateCurrentWork() {
        synchronized (this.currentWorkLock) {
            if (this.currentWork != null && !this.currentWork.isTerminal()) {
                return this.currentWork;
            }
            this.currentWork = this.workQueue.poll();
            while (this.currentWork != null && this.currentWork.isTerminal()) {
                LOGGER.atVerbose().addKeyValue(ServiceBusConstants.WORK_ID_KEY, this.currentWork.getId()).addKeyValue("numberOfEvents", this.currentWork.getNumberOfEvents()).log("This work from queue is terminal. Skip it.");
                this.currentWork = this.workQueue.poll();
            }
            if (this.currentWork != null) {
                SynchronousReceiveWork synchronousReceiveWork = this.currentWork;
                LOGGER.atVerbose().addKeyValue(ServiceBusConstants.WORK_ID_KEY, synchronousReceiveWork.getId()).addKeyValue("numberOfEvents", synchronousReceiveWork.getNumberOfEvents()).log("Current work updated.");
                synchronousReceiveWork.start();
                requestUpstream(synchronousReceiveWork.getNumberOfEvents());
            }
            return this.currentWork;
        }
    }

    private void requestUpstream(long j) {
        if (isTerminated()) {
            LOGGER.info("Cannot request more messages upstream. Subscriber is terminated.");
            return;
        }
        Subscription subscription = UPSTREAM.get(this);
        if (subscription == null) {
            LOGGER.info("There is no upstream to request messages from.");
            return;
        }
        long j2 = REQUESTED.get(this);
        long j3 = j - j2;
        LOGGER.atVerbose().addKeyValue(ServiceBusConstants.NUMBER_OF_REQUESTED_MESSAGES_KEY, j2).addKeyValue("numberOfMessages", j).addKeyValue("difference", j3).log("Requesting messages from upstream.");
        if (j3 <= 0) {
            return;
        }
        Operators.addCap(REQUESTED, this, j3);
        subscription.request(j3);
    }

    public void dispose() {
        super.dispose();
        dispose("Upstream completed the receive work.", null);
    }

    private void dispose(String str, Throwable th) {
        super.dispose();
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.disposalReason = th;
        synchronized (this.currentWorkLock) {
            if (this.currentWork != null) {
                this.currentWork.complete(str, th);
                this.currentWork = null;
            }
            SynchronousReceiveWork poll = this.workQueue.poll();
            while (poll != null) {
                poll.complete(str, th);
                poll = this.workQueue.poll();
            }
        }
    }

    int getWorkQueueSize() {
        return this.workQueue.size();
    }
}
