/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpTransaction;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.CreditFlowMode;
import com.azure.core.amqp.implementation.MessageFlux;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannelClosedException;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.StringUtil;
import com.azure.core.amqp.implementation.handler.DeliveryNotOnLinkException;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.AutoDispositionLockRenew;
import com.azure.messaging.servicebus.ConnectionCacheWrapper;
import com.azure.messaging.servicebus.FluxAutoComplete;
import com.azure.messaging.servicebus.FluxAutoLockRenew;
import com.azure.messaging.servicebus.FluxTrace;
import com.azure.messaging.servicebus.IServiceBusSessionManager;
import com.azure.messaging.servicebus.LockRenewalOperation;
import com.azure.messaging.servicebus.ReceiverOptions;
import com.azure.messaging.servicebus.ServiceBusAsyncConsumer;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.ServiceBusErrorSource;
import com.azure.messaging.servicebus.ServiceBusException;
import com.azure.messaging.servicebus.ServiceBusMessageContext;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusSingleSessionManager;
import com.azure.messaging.servicebus.ServiceBusTransactionContext;
import com.azure.messaging.servicebus.implementation.DispositionStatus;
import com.azure.messaging.servicebus.implementation.LockContainer;
import com.azure.messaging.servicebus.implementation.Messages;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLinkProcessor;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusTracer;
import com.azure.messaging.servicebus.models.AbandonOptions;
import com.azure.messaging.servicebus.models.CompleteOptions;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.DeferOptions;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;

@ServiceClient(builder=ServiceBusClientBuilder.class, isAsync=true)
public final class ServiceBusReceiverAsyncClient
implements AutoCloseable {
    private static final Duration EXPIRED_RENEWAL_CLEANUP_INTERVAL = Duration.ofMinutes(2L);
    private static final DeadLetterOptions DEFAULT_DEAD_LETTER_OPTIONS = new DeadLetterOptions();
    private static final String TRANSACTION_LINK_NAME = "coordinator";
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReceiverAsyncClient.class);
    private final LockContainer<LockRenewalOperation> renewalContainer;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final LockContainer<OffsetDateTime> managementNodeLocks;
    private final String fullyQualifiedNamespace;
    private final String entityPath;
    private final MessagingEntityType entityType;
    private final ReceiverOptions receiverOptions;
    private final ConnectionCacheWrapper connectionCacheWrapper;
    private final boolean isOnV2;
    private final Mono<ServiceBusAmqpConnection> connectionProcessor;
    private final ServiceBusReceiverInstrumentation instrumentation;
    private final ServiceBusTracer tracer;
    private final MessageSerializer messageSerializer;
    private final Runnable onClientClose;
    private final IServiceBusSessionManager sessionManager;
    private final boolean isSessionEnabled;
    private final Semaphore completionLock = new Semaphore(1);
    private final String identifier;
    private final AtomicLong lastPeekedSequenceNumber = new AtomicLong(-1L);
    private final AtomicReference<ServiceBusAsyncConsumer> consumer = new AtomicReference();
    private final AutoCloseable trackSettlementSequenceNumber;

    ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType, ReceiverOptions receiverOptions, ConnectionCacheWrapper connectionCacheWrapper, Duration cleanupInterval, ServiceBusReceiverInstrumentation instrumentation, MessageSerializer messageSerializer, Runnable onClientClose, String identifier) {
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        this.entityType = Objects.requireNonNull(entityType, "'entityType' cannot be null.");
        this.receiverOptions = Objects.requireNonNull(receiverOptions, "'receiveOptions cannot be null.'");
        this.connectionCacheWrapper = Objects.requireNonNull(connectionCacheWrapper, "'connectionCacheWrapper' cannot be null.");
        this.connectionProcessor = this.connectionCacheWrapper.getConnection();
        this.instrumentation = Objects.requireNonNull(instrumentation, "'tracer' cannot be null");
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
        this.sessionManager = null;
        if (receiverOptions.getSessionId() != null || receiverOptions.getMaxConcurrentSessions() != null) {
            throw new IllegalStateException("Session-specific options are not expected to be present on a client for session unaware entity.");
        }
        this.isSessionEnabled = false;
        this.isOnV2 = this.connectionCacheWrapper.isV2();
        this.managementNodeLocks = new LockContainer(cleanupInterval);
        Consumer<LockRenewalOperation> onExpired = renewal -> {
            LOGGER.atVerbose().addKeyValue("lockToken", renewal.getLockToken()).addKeyValue("status", (Object)renewal.getStatus()).log("Closing expired renewal operation.", new Object[]{renewal.getThrowable()});
            renewal.close();
        };
        this.renewalContainer = new LockContainer<LockRenewalOperation>(EXPIRED_RENEWAL_CLEANUP_INTERVAL, onExpired);
        this.identifier = identifier;
        this.tracer = instrumentation.getTracer();
        this.trackSettlementSequenceNumber = instrumentation.startTrackingSettlementSequenceNumber();
    }

    ServiceBusReceiverAsyncClient(String fullyQualifiedNamespace, String entityPath, MessagingEntityType entityType, ReceiverOptions receiverOptions, ConnectionCacheWrapper connectionCacheWrapper, Duration cleanupInterval, ServiceBusReceiverInstrumentation instrumentation, MessageSerializer messageSerializer, Runnable onClientClose, IServiceBusSessionManager sessionManager) {
        this.fullyQualifiedNamespace = Objects.requireNonNull(fullyQualifiedNamespace, "'fullyQualifiedNamespace' cannot be null.");
        this.entityPath = Objects.requireNonNull(entityPath, "'entityPath' cannot be null.");
        this.entityType = Objects.requireNonNull(entityType, "'entityType' cannot be null.");
        this.receiverOptions = Objects.requireNonNull(receiverOptions, "'receiveOptions cannot be null.'");
        this.connectionCacheWrapper = Objects.requireNonNull(connectionCacheWrapper, "'connectionCacheWrapper' cannot be null.");
        this.connectionProcessor = this.connectionCacheWrapper.getConnection();
        this.instrumentation = Objects.requireNonNull(instrumentation, "'tracer' cannot be null");
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.onClientClose = Objects.requireNonNull(onClientClose, "'onClientClose' cannot be null.");
        this.sessionManager = Objects.requireNonNull(sessionManager, "'sessionManager' cannot be null.");
        this.isSessionEnabled = true;
        this.isOnV2 = this.connectionCacheWrapper.isV2();
        boolean isV2SessionManager = this.sessionManager instanceof ServiceBusSingleSessionManager;
        if (this.isOnV2 ^ isV2SessionManager) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("For V2 Session, the manager should be ServiceBusSingleSessionManager, and ConnectionCache should be on V2."));
        }
        this.managementNodeLocks = new LockContainer(cleanupInterval);
        Consumer<LockRenewalOperation> onExpired = renewal -> {
            LOGGER.atInfo().addKeyValue("sessionId", renewal.getSessionId()).addKeyValue("status", (Object)renewal.getStatus()).log("Closing expired renewal operation.", new Object[]{renewal.getThrowable()});
            renewal.close();
        };
        this.renewalContainer = new LockContainer<LockRenewalOperation>(EXPIRED_RENEWAL_CLEANUP_INTERVAL, onExpired);
        this.identifier = sessionManager.getIdentifier();
        this.tracer = instrumentation.getTracer();
        this.trackSettlementSequenceNumber = instrumentation.startTrackingSettlementSequenceNumber();
    }

    public String getFullyQualifiedNamespace() {
        return this.fullyQualifiedNamespace;
    }

    public String getEntityPath() {
        return this.entityPath;
    }

    public String getSessionId() {
        return this.receiverOptions.getSessionId();
    }

    public String getIdentifier() {
        return this.identifier;
    }

    public Mono<Void> abandon(ServiceBusReceivedMessage message) {
        return this.updateDisposition(message, DispositionStatus.ABANDONED, null, null, null, null);
    }

    public Mono<Void> abandon(ServiceBusReceivedMessage message, AbandonOptions options) {
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'settlementOptions' cannot be null."));
        }
        if (!Objects.isNull(options.getTransactionContext()) && Objects.isNull(options.getTransactionContext().getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(message, DispositionStatus.ABANDONED, null, null, options.getPropertiesToModify(), options.getTransactionContext());
    }

    public Mono<Void> complete(ServiceBusReceivedMessage message) {
        return this.updateDisposition(message, DispositionStatus.COMPLETED, null, null, null, null);
    }

    public Mono<Void> complete(ServiceBusReceivedMessage message, CompleteOptions options) {
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        if (!Objects.isNull(options.getTransactionContext()) && Objects.isNull(options.getTransactionContext().getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(message, DispositionStatus.COMPLETED, null, null, null, options.getTransactionContext());
    }

    public Mono<Void> defer(ServiceBusReceivedMessage message) {
        return this.updateDisposition(message, DispositionStatus.DEFERRED, null, null, null, null);
    }

    public Mono<Void> defer(ServiceBusReceivedMessage message, DeferOptions options) {
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        if (!Objects.isNull(options.getTransactionContext()) && Objects.isNull(options.getTransactionContext().getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(message, DispositionStatus.DEFERRED, null, null, options.getPropertiesToModify(), options.getTransactionContext());
    }

    public Mono<Void> deadLetter(ServiceBusReceivedMessage message) {
        return this.deadLetter(message, DEFAULT_DEAD_LETTER_OPTIONS);
    }

    public Mono<Void> deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions options) {
        if (Objects.isNull(options)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options' cannot be null."));
        }
        if (!Objects.isNull(options.getTransactionContext()) && Objects.isNull(options.getTransactionContext().getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'options.transactionContext.transactionId' cannot be null."));
        }
        return this.updateDisposition(message, DispositionStatus.SUSPENDED, options.getDeadLetterReason(), options.getDeadLetterErrorDescription(), options.getPropertiesToModify(), options.getTransactionContext());
    }

    public Mono<byte[]> getSessionState() {
        return this.getSessionState(this.receiverOptions.getSessionId());
    }

    public Mono<ServiceBusReceivedMessage> peekMessage() {
        return this.peekMessage(this.receiverOptions.getSessionId());
    }

    Mono<ServiceBusReceivedMessage> peekMessage(String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peek")));
        }
        Mono result = this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> {
            long sequence = this.lastPeekedSequenceNumber.get() + 1L;
            LOGGER.atVerbose().addKeyValue("sequenceNumber", sequence).log("Peek message.");
            return channel.peek(sequence, sessionId, this.getLinkName(sessionId));
        }).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE)).handle((message, sink) -> {
            long current = this.lastPeekedSequenceNumber.updateAndGet(value -> Math.max(value, message.getSequenceNumber()));
            LOGGER.atVerbose().addKeyValue("sequenceNumber", current).log("Updating last peeked sequence number.");
            sink.next(message);
        });
        return this.tracer.traceManagementReceive("ServiceBus.peekMessage", (Mono<ServiceBusReceivedMessage>)result);
    }

    public Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber) {
        return this.peekMessage(sequenceNumber, this.receiverOptions.getSessionId());
    }

    Mono<ServiceBusReceivedMessage> peekMessage(long sequenceNumber, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekAt")));
        }
        return this.tracer.traceManagementReceive("ServiceBus.peekMessage", (Mono<ServiceBusReceivedMessage>)this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(node -> node.peek(sequenceNumber, sessionId, this.getLinkName(sessionId))).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE)));
    }

    public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages) {
        return this.tracer.traceSyncReceive("ServiceBus.peekMessages", this.peekMessages(maxMessages, this.receiverOptions.getSessionId()));
    }

    Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatch")));
        }
        if (maxMessages <= 0) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'maxMessages' is not positive."));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMapMany(node -> {
            long nextSequenceNumber = this.lastPeekedSequenceNumber.get() + 1L;
            LOGGER.atVerbose().addKeyValue("sequenceNumber", nextSequenceNumber).log("Peek batch.");
            return node.peek(nextSequenceNumber, sessionId, this.getLinkName(sessionId), maxMessages).doOnNext(next -> {
                long current = this.lastPeekedSequenceNumber.updateAndGet(value -> Math.max(value, next.getSequenceNumber()));
                LOGGER.atVerbose().addKeyValue("sequenceNumber", current).log("Last peeked sequence number in batch.");
            });
        }).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE));
    }

    public Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber) {
        return this.peekMessages(maxMessages, sequenceNumber, this.receiverOptions.getSessionId());
    }

    Flux<ServiceBusReceivedMessage> peekMessages(int maxMessages, long sequenceNumber, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "peekBatchAt")));
        }
        if (maxMessages <= 0) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'maxMessages' is not positive."));
        }
        return this.tracer.traceSyncReceive("ServiceBus.peekMessages", (Flux<ServiceBusReceivedMessage>)this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMapMany(node -> node.peek(sequenceNumber, sessionId, this.getLinkName(sessionId), maxMessages)).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE)));
    }

    public Flux<ServiceBusReceivedMessage> receiveMessages() {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveMessages")));
        }
        if (this.isOnV2) {
            if (this.isSessionEnabled) {
                return this.sessionReactiveReceiveV2();
            }
            return this.nonSessionReactiveReceiveV2();
        }
        return this.receiveMessagesNoBackPressure().limitRate(1, 0);
    }

    Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
        return this.receiveMessagesWithContext(0).handle((serviceBusMessageContext, sink) -> {
            if (serviceBusMessageContext.hasError()) {
                sink.error(serviceBusMessageContext.getThrowable());
                return;
            }
            sink.next((Object)serviceBusMessageContext.getMessage());
        });
    }

    Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
        return this.receiveMessagesWithContext(1);
    }

    private Flux<ServiceBusMessageContext> receiveMessagesWithContext(int highTide) {
        Flux messageFlux = this.sessionManager != null ? this.sessionManager.receive() : this.getOrCreateConsumer().receive().map(ServiceBusMessageContext::new);
        FluxTrace messageFluxWithTracing = new FluxTrace((Flux<? extends ServiceBusMessageContext>)messageFlux, this.instrumentation);
        FluxOperator withAutoLockRenewal = !this.isSessionEnabled && this.receiverOptions.isAutoLockRenewEnabled() ? new FluxAutoLockRenew((Flux<? extends ServiceBusMessageContext>)messageFluxWithTracing, this.receiverOptions, this.renewalContainer, this::renewMessageLock, this.tracer) : messageFluxWithTracing;
        FluxOperator result = this.receiverOptions.isEnableAutoComplete() ? new FluxAutoComplete((Flux<? extends ServiceBusMessageContext>)withAutoLockRenewal, this.completionLock, context -> context.getMessage() != null ? this.complete(context.getMessage()) : Mono.empty(), context -> context.getMessage() != null ? this.abandon(context.getMessage()) : Mono.empty()) : withAutoLockRenewal;
        if (highTide > 0) {
            result = result.limitRate(highTide, 0);
        }
        return result.onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE));
    }

    public Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber) {
        return this.receiveDeferredMessage(sequenceNumber, this.receiverOptions.getSessionId());
    }

    Mono<ServiceBusReceivedMessage> receiveDeferredMessage(long sequenceNumber, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessage")));
        }
        return this.tracer.traceManagementReceive("ServiceBus.receiveDeferredMessage", (Mono<ServiceBusReceivedMessage>)this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(node -> node.receiveDeferredMessages(this.receiverOptions.getReceiveMode(), sessionId, this.getLinkName(sessionId), Collections.singleton(sequenceNumber)).last()).map(receivedMessage -> {
            if (CoreUtils.isNullOrEmpty((CharSequence)receivedMessage.getLockToken())) {
                return receivedMessage;
            }
            if (this.receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
                receivedMessage.setLockedUntil(this.managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(), receivedMessage.getLockedUntil(), receivedMessage.getLockedUntil()));
            }
            return receivedMessage;
        }).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE)));
    }

    public Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers) {
        return this.tracer.traceSyncReceive("ServiceBus.receiveDeferredMessages", this.receiveDeferredMessages(sequenceNumbers, this.receiverOptions.getSessionId()));
    }

    Flux<ServiceBusReceivedMessage> receiveDeferredMessages(Iterable<Long> sequenceNumbers, String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "receiveDeferredMessageBatch")));
        }
        if (sequenceNumbers == null) {
            return FluxUtil.fluxError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'sequenceNumbers' cannot be null"));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMapMany(node -> node.receiveDeferredMessages(this.receiverOptions.getReceiveMode(), sessionId, this.getLinkName(sessionId), sequenceNumbers)).map(receivedMessage -> {
            if (CoreUtils.isNullOrEmpty((CharSequence)receivedMessage.getLockToken())) {
                return receivedMessage;
            }
            if (this.receiverOptions.getReceiveMode() == ServiceBusReceiveMode.PEEK_LOCK) {
                receivedMessage.setLockedUntil(this.managementNodeLocks.addOrUpdate(receivedMessage.getLockToken(), receivedMessage.getLockedUntil(), receivedMessage.getLockedUntil()));
            }
            return receivedMessage;
        }).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE));
    }

    Mono<Void> release(ServiceBusReceivedMessage message) {
        return this.updateDisposition(message, DispositionStatus.RELEASED, null, null, null, null);
    }

    public Mono<OffsetDateTime> renewMessageLock(ServiceBusReceivedMessage message) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewMessageLock")));
        }
        if (this.isSessionEnabled) {
            String errorMessage = "Renewing message lock is an invalid operation when working with sessions.";
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Renewing message lock is an invalid operation when working with sessions."));
        }
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        if (Objects.isNull(message.getLockToken())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message.getLockToken()' cannot be null."));
        }
        if (message.getLockToken().isEmpty()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'message.getLockToken()' cannot be empty."));
        }
        return this.tracer.traceRenewMessageLock(this.renewMessageLock(message.getLockToken()), message).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RENEW_LOCK));
    }

    Mono<OffsetDateTime> renewMessageLock(String lockToken) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewMessageLock")));
        }
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(serviceBusManagementNode -> serviceBusManagementNode.renewMessageLock(lockToken, this.getLinkName(null))).map(offsetDateTime -> this.isOnV2 ? offsetDateTime : this.managementNodeLocks.addOrUpdate(lockToken, (OffsetDateTime)offsetDateTime, (OffsetDateTime)offsetDateTime));
    }

    public Mono<Void> renewMessageLock(ServiceBusReceivedMessage message, Duration maxLockRenewalDuration) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewMessageLock")));
        }
        if (this.isSessionEnabled) {
            String errorMessage = "Renewing message lock is an invalid operation when working with sessions.";
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Renewing message lock is an invalid operation when working with sessions."));
        }
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        if (Objects.isNull(message.getLockToken())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message.getLockToken()' cannot be null."));
        }
        if (message.getLockToken().isEmpty()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'message.getLockToken()' cannot be empty."));
        }
        if (maxLockRenewalDuration == null) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'maxLockRenewalDuration' cannot be null."));
        }
        if (maxLockRenewalDuration.isNegative()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'maxLockRenewalDuration' cannot be negative."));
        }
        LockRenewalOperation operation = new LockRenewalOperation(message.getLockToken(), maxLockRenewalDuration, false, ignored -> this.renewMessageLock(message));
        this.renewalContainer.addOrUpdate(message.getLockToken(), OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
        return operation.getCompletionOperation().onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RENEW_LOCK));
    }

    public Mono<OffsetDateTime> renewSessionLock() {
        return this.renewSessionLock(this.receiverOptions.getSessionId());
    }

    public Mono<Void> renewSessionLock(Duration maxLockRenewalDuration) {
        return this.renewSessionLock(this.receiverOptions.getSessionId(), maxLockRenewalDuration);
    }

    public Mono<Void> setSessionState(byte[] sessionState) {
        return this.setSessionState(this.receiverOptions.getSessionId(), sessionState);
    }

    public Mono<ServiceBusTransactionContext> createTransaction() {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "createTransaction")));
        }
        return this.tracer.traceMono("ServiceBus.commitTransaction", this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(AmqpSession::createTransaction).map(transaction -> new ServiceBusTransactionContext(transaction.getTransactionId()))).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE));
    }

    public Mono<Void> commitTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "commitTransaction")));
        }
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.tracer.traceMono("ServiceBus.commitTransaction", this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.commitTransaction(new AmqpTransaction(transactionContext.getTransactionId())))).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE));
    }

    public Mono<Void> rollbackTransaction(ServiceBusTransactionContext transactionContext) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "rollbackTransaction")));
        }
        if (Objects.isNull(transactionContext)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext' cannot be null."));
        }
        if (Objects.isNull(transactionContext.getTransactionId())) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'transactionContext.transactionId' cannot be null."));
        }
        return this.tracer.traceMono("ServiceBus.rollbackTransaction", this.connectionProcessor.flatMap(connection -> connection.createSession(TRANSACTION_LINK_NAME)).flatMap(transactionSession -> transactionSession.rollbackTransaction(new AmqpTransaction(transactionContext.getTransactionId())))).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE));
    }

    @Override
    public void close() {
        if (this.isDisposed.get()) {
            return;
        }
        try {
            boolean acquired = this.completionLock.tryAcquire(5L, TimeUnit.SECONDS);
            if (!acquired) {
                LOGGER.info("Unable to obtain completion lock.");
            }
        }
        catch (InterruptedException e) {
            LOGGER.info("Unable to obtain completion lock.", new Object[]{e});
        }
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        LOGGER.info("Removing receiver links.");
        ServiceBusAsyncConsumer disposed = this.consumer.getAndSet(null);
        if (disposed != null) {
            disposed.close();
        }
        if (this.sessionManager != null) {
            this.sessionManager.close();
        }
        this.managementNodeLocks.close();
        this.renewalContainer.close();
        if (this.trackSettlementSequenceNumber != null) {
            try {
                this.trackSettlementSequenceNumber.close();
            }
            catch (Exception e) {
                LOGGER.info("Unable to close settlement sequence number subscription.", new Object[]{e});
            }
        }
        this.onClientClose.run();
    }

    ReceiverOptions getReceiverOptions() {
        return this.receiverOptions;
    }

    private boolean isManagementToken(String lockToken) {
        return this.managementNodeLocks.containsUnexpired(lockToken);
    }

    private Mono<Void> updateDisposition(ServiceBusReceivedMessage message, DispositionStatus dispositionStatus, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify, ServiceBusTransactionContext transactionContext) {
        Mono updateDispositionOperation;
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, dispositionStatus.getValue())));
        }
        if (Objects.isNull(message)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'message' cannot be null."));
        }
        String lockToken = message.getLockToken();
        String sessionId = message.getSessionId();
        if (this.receiverOptions.getReceiveMode() != ServiceBusReceiveMode.PEEK_LOCK) {
            return Mono.error((Throwable)LOGGER.logExceptionAsError((RuntimeException)new UnsupportedOperationException(String.format("'%s' is not supported on a receiver opened in ReceiveMode.RECEIVE_AND_DELETE.", new Object[]{dispositionStatus}))));
        }
        if (message.isSettled()) {
            return Mono.error((Throwable)LOGGER.logExceptionAsError((RuntimeException)new IllegalArgumentException("The message has either been deleted or already settled.")));
        }
        if (message.getLockToken() == null) {
            String errorMessage = "This operation is not supported for peeked messages. Only messages received using receiveMessages() in PEEK_LOCK mode can be settled.";
            return Mono.error((Throwable)LOGGER.logExceptionAsError((RuntimeException)new UnsupportedOperationException("This operation is not supported for peeked messages. Only messages received using receiveMessages() in PEEK_LOCK mode can be settled.")));
        }
        String sessionIdToUse = sessionId == null && !CoreUtils.isNullOrEmpty((CharSequence)this.receiverOptions.getSessionId()) ? this.receiverOptions.getSessionId() : sessionId;
        LOGGER.atVerbose().addKeyValue("lockToken", lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue("sessionId", sessionIdToUse).addKeyValue("dispositionStatus", (Object)dispositionStatus).log("Disposition started.");
        if (this.isSessionEnabled) {
            updateDispositionOperation = this.isOnV2 ? this.sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus, propertiesToModify, deadLetterReason, deadLetterErrorDescription, transactionContext).then(Mono.fromRunnable(() -> {
                LOGGER.atVerbose().addKeyValue("lockToken", lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue("dispositionStatus", (Object)dispositionStatus).log("Disposition completed.");
                message.setIsSettled();
            })).onErrorResume(DeliveryNotOnLinkException.class, __ -> {
                LOGGER.info("Could not perform disposition on session manger. Performing on management node.");
                return this.dispositionViaManagementNode(message, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext);
            }) : this.sessionManager.updateDisposition(lockToken, sessionId, dispositionStatus, propertiesToModify, deadLetterReason, deadLetterErrorDescription, transactionContext).flatMap(isSuccess -> {
                if (isSuccess.booleanValue()) {
                    message.setIsSettled();
                    this.renewalContainer.remove(lockToken);
                    return Mono.empty();
                }
                LOGGER.info("Could not perform on session manger. Performing on management node.");
                return this.dispositionViaManagementNode(message, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext);
            });
        } else {
            ServiceBusAsyncConsumer existingConsumer = this.consumer.get();
            updateDispositionOperation = this.isManagementToken(lockToken) || existingConsumer == null ? this.dispositionViaManagementNode(message, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext) : (this.isOnV2 ? existingConsumer.updateDisposition(lockToken, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext).then(Mono.fromRunnable(() -> {
                LOGGER.atVerbose().addKeyValue("lockToken", lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue("dispositionStatus", (Object)dispositionStatus).log("Disposition completed.");
                message.setIsSettled();
                this.renewalContainer.remove(lockToken);
            })).onErrorResume(DeliveryNotOnLinkException.class, __ -> this.dispositionViaManagementNode(message, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext)) : existingConsumer.updateDisposition(lockToken, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, transactionContext).then(Mono.fromRunnable(() -> {
                LOGGER.atVerbose().addKeyValue("lockToken", lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue("dispositionStatus", (Object)dispositionStatus).log("Disposition completed.");
                message.setIsSettled();
                this.renewalContainer.remove(lockToken);
            })));
        }
        return this.instrumentation.instrumentSettlement(updateDispositionOperation, message, message.getContext(), dispositionStatus).onErrorMap(throwable -> {
            if (throwable instanceof ServiceBusException) {
                return throwable;
            }
            switch (dispositionStatus) {
                case COMPLETED: {
                    return new ServiceBusException((Throwable)throwable, ServiceBusErrorSource.COMPLETE);
                }
                case ABANDONED: {
                    return new ServiceBusException((Throwable)throwable, ServiceBusErrorSource.ABANDON);
                }
            }
            return new ServiceBusException((Throwable)throwable, ServiceBusErrorSource.UNKNOWN);
        });
    }

    private Mono<Void> dispositionViaManagementNode(ServiceBusReceivedMessage message, DispositionStatus dispositionStatus, String deadLetterReason, String deadLetterErrorDescription, Map<String, Object> propertiesToModify, ServiceBusTransactionContext transactionContext) {
        String lockToken = message.getLockToken();
        String sessionId = message.getSessionId();
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(node -> node.updateDisposition(lockToken, dispositionStatus, deadLetterReason, deadLetterErrorDescription, propertiesToModify, sessionId, this.getLinkName(sessionId), transactionContext)).then(Mono.fromRunnable(() -> {
            LOGGER.atInfo().addKeyValue("lockToken", lockToken).addKeyValue("entityPath", this.entityPath).addKeyValue("dispositionStatus", (Object)dispositionStatus).log("Disposition (via management node) completed.");
            message.setIsSettled();
            this.managementNodeLocks.remove(lockToken);
            this.renewalContainer.remove(lockToken);
        }));
    }

    private ServiceBusAsyncConsumer getOrCreateConsumer() {
        ServiceBusAsyncConsumer newConsumer;
        if (this.isSessionEnabled) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalStateException("The ServiceBusAsyncConsumer is expected to work only with session unaware entity."));
        }
        ServiceBusAsyncConsumer existing = this.consumer.get();
        if (existing != null) {
            return existing;
        }
        String linkName = StringUtil.getRandomString((String)this.entityPath);
        LOGGER.atInfo().addKeyValue("linkName", linkName).addKeyValue("entityPath", this.entityPath).log("Creating consumer.");
        Mono receiveLinkMono = this.connectionProcessor.flatMap(connection -> connection.createReceiveLink(linkName, this.entityPath, this.receiverOptions.getReceiveMode(), null, this.entityType, this.identifier)).doOnNext(next -> LOGGER.atVerbose().addKeyValue("linkName", linkName).addKeyValue("entityPath", next.getEntityPath()).addKeyValue("mode", (Object)this.receiverOptions.getReceiveMode()).addKeyValue("isSessionEnabled", false).addKeyValue("entityType", (Object)this.entityType).log("Created consumer for Service Bus resource."));
        Mono retryableReceiveLinkMono = RetryUtil.withRetry((Mono)receiveLinkMono.onErrorMap(RequestResponseChannelClosedException.class, e -> new AmqpException(true, e.getMessage(), (Throwable)e, null)), (AmqpRetryOptions)this.connectionCacheWrapper.getRetryOptions(), (String)("Failed to create receive link " + linkName), (boolean)true);
        Flux receiveLinkFlux = retryableReceiveLinkMono.repeat().filter(link -> !link.isDisposed());
        AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)this.connectionCacheWrapper.getRetryOptions());
        if (this.isOnV2) {
            MessageFlux messageFlux = new MessageFlux(receiveLinkFlux, this.receiverOptions.getPrefetchCount(), CreditFlowMode.RequestDriven, retryPolicy);
            newConsumer = new ServiceBusAsyncConsumer(linkName, messageFlux, this.messageSerializer, this.receiverOptions, this.instrumentation);
        } else {
            ServiceBusReceiveLinkProcessor linkMessageProcessor = (ServiceBusReceiveLinkProcessor)receiveLinkFlux.subscribeWith((Subscriber)new ServiceBusReceiveLinkProcessor(this.receiverOptions.getPrefetchCount(), retryPolicy));
            newConsumer = new ServiceBusAsyncConsumer(linkName, linkMessageProcessor, this.messageSerializer, this.receiverOptions);
        }
        if (this.consumer.compareAndSet(null, newConsumer)) {
            return newConsumer;
        }
        newConsumer.close();
        return this.consumer.get();
    }

    private String getLinkName(String sessionId) {
        if (!CoreUtils.isNullOrEmpty((CharSequence)sessionId)) {
            return this.isSessionEnabled ? this.sessionManager.getLinkName(sessionId) : null;
        }
        ServiceBusAsyncConsumer existing = this.consumer.get();
        return existing != null ? existing.getLinkName() : null;
    }

    private Mono<OffsetDateTime> renewSessionLock(String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock")));
        }
        if (!this.isSessionEnabled) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Cannot renew session lock on a non-session receiver."));
        }
        String linkName = this.sessionManager.getLinkName(sessionId);
        return this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> this.tracer.traceMono("ServiceBus.renewSessionLock", channel.renewSessionLock(sessionId, linkName))).onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RENEW_LOCK));
    }

    private Mono<Void> renewSessionLock(String sessionId, Duration maxLockRenewalDuration) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "renewSessionLock")));
        }
        if (!this.isSessionEnabled) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Cannot renew session lock on a non-session receiver."));
        }
        if (maxLockRenewalDuration == null) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'maxLockRenewalDuration' cannot be null."));
        }
        if (maxLockRenewalDuration.isNegative()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'maxLockRenewalDuration' cannot be negative."));
        }
        if (Objects.isNull(sessionId)) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new NullPointerException("'sessionId' cannot be null."));
        }
        if (sessionId.isEmpty()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("'sessionId' cannot be empty."));
        }
        LockRenewalOperation operation = new LockRenewalOperation(sessionId, maxLockRenewalDuration, true, this::renewSessionLock);
        this.renewalContainer.addOrUpdate(sessionId, OffsetDateTime.now().plus(maxLockRenewalDuration), operation);
        return operation.getCompletionOperation();
    }

    private Mono<Void> setSessionState(String sessionId, byte[] sessionState) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "setSessionState")));
        }
        if (!this.isSessionEnabled) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Cannot set session state on a non-session receiver."));
        }
        assert (this.sessionManager != null);
        String linkName = this.sessionManager.getLinkName(sessionId);
        return this.tracer.traceMono("ServiceBus.setSessionState", this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> channel.setSessionState(sessionId, sessionState, linkName))).onErrorMap(err -> this.mapError((Throwable)err, ServiceBusErrorSource.RECEIVE));
    }

    private Mono<byte[]> getSessionState(String sessionId) {
        if (this.isDisposed.get()) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException(String.format(Messages.INVALID_OPERATION_DISPOSED_RECEIVER, "getSessionState")));
        }
        if (!this.isSessionEnabled) {
            return FluxUtil.monoError((ClientLogger)LOGGER, (RuntimeException)new IllegalStateException("Cannot get session state on a non-session receiver."));
        }
        assert (this.sessionManager != null);
        String linkName = this.sessionManager.getLinkName(sessionId);
        return this.tracer.traceMono("ServiceBus.setSessionState", this.connectionProcessor.flatMap(connection -> connection.getManagementNode(this.entityPath, this.entityType)).flatMap(channel -> channel.getSessionState(sessionId, linkName))).onErrorMap(err -> this.mapError((Throwable)err, ServiceBusErrorSource.RECEIVE));
    }

    ServiceBusReceiverInstrumentation getInstrumentation() {
        return this.instrumentation;
    }

    private Throwable mapError(Throwable throwable, ServiceBusErrorSource errorSource) {
        if (!(throwable instanceof ServiceBusException)) {
            return new ServiceBusException(throwable, errorSource);
        }
        return throwable;
    }

    boolean isConnectionClosed() {
        return this.connectionCacheWrapper.isChannelClosed();
    }

    boolean isManagementNodeLocksClosed() {
        return this.managementNodeLocks.isClosed();
    }

    boolean isRenewalContainerClosed() {
        return this.renewalContainer.isClosed();
    }

    boolean isSessionEnabled() {
        return this.isSessionEnabled;
    }

    boolean isAutoLockRenewRequested() {
        return this.receiverOptions.isAutoLockRenewEnabled();
    }

    boolean isV2() {
        return this.isOnV2;
    }

    Flux<ServiceBusReceivedMessage> nonSessionProcessorReceiveV2() {
        assert (this.isOnV2 && !this.isSessionEnabled);
        return this.getOrCreateConsumer().receive();
    }

    private Flux<ServiceBusReceivedMessage> nonSessionReactiveReceiveV2() {
        assert (this.isOnV2 && !this.isSessionEnabled);
        boolean enableAutoDisposition = this.receiverOptions.isEnableAutoComplete();
        boolean enableAutoLockRenew = this.receiverOptions.isAutoLockRenewEnabled();
        Flux messages = this.getOrCreateConsumer().receive().onErrorMap(throwable -> this.mapError((Throwable)throwable, ServiceBusErrorSource.RECEIVE));
        if (enableAutoDisposition | enableAutoLockRenew) {
            return new AutoDispositionLockRenew((Flux<? extends ServiceBusReceivedMessage>)messages, this, enableAutoDisposition, enableAutoLockRenew, this.completionLock);
        }
        return messages;
    }

    Flux<ServiceBusReceivedMessage> nonSessionSyncReceiveV2() {
        assert (this.isOnV2 && !this.isSessionEnabled);
        Flux messages = this.getOrCreateConsumer().receive();
        return this.receiverOptions.isAutoLockRenewEnabled() ? messages.doOnNext(this::beginLockRenewal) : messages;
    }

    private Flux<ServiceBusReceivedMessage> sessionReactiveReceiveV2() {
        assert (this.isOnV2 && this.isSessionEnabled && this.sessionManager instanceof ServiceBusSingleSessionManager);
        ServiceBusSingleSessionManager singleSessionManager = (ServiceBusSingleSessionManager)this.sessionManager;
        Flux<ServiceBusReceivedMessage> messages = singleSessionManager.receiveMessages();
        boolean enableAutoDisposition = this.receiverOptions.isEnableAutoComplete();
        if (enableAutoDisposition) {
            return new AutoDispositionLockRenew(messages, this, true, false, this.completionLock);
        }
        return messages;
    }

    Flux<ServiceBusReceivedMessage> sessionSyncReceiveV2() {
        assert (this.isOnV2 && this.isSessionEnabled && this.sessionManager instanceof ServiceBusSingleSessionManager);
        ServiceBusSingleSessionManager singleSessionManager = (ServiceBusSingleSessionManager)this.sessionManager;
        return singleSessionManager.receiveMessages();
    }

    Disposable beginLockRenewal(ServiceBusReceivedMessage message) {
        if (this.isSessionEnabled) {
            throw LOGGER.logExceptionAsError((RuntimeException)new IllegalStateException("Renewing message lock is an invalid operation when working with sessions."));
        }
        Duration maxRenewalDuration = this.receiverOptions.getMaxLockRenewDuration();
        Objects.requireNonNull(maxRenewalDuration, "'receivingOptions.maxAutoLockRenewDuration' is required for recurring lock renewal.");
        if (message == null) {
            return Disposables.disposed();
        }
        String lockToken = message.getLockToken();
        if (Objects.isNull(lockToken)) {
            LOGGER.atWarning().addKeyValue("sequenceNumber", message.getSequenceNumber()).log("Unexpected, LockToken is required for recurring lock renewal.");
            return Disposables.disposed();
        }
        OffsetDateTime initialExpireAt = message.getLockedUntil();
        if (Objects.isNull(initialExpireAt)) {
            LOGGER.atWarning().addKeyValue("sequenceNumber", message.getSequenceNumber()).log("Unexpected, LockedUntil is required for recurring lock renewal.");
            return Disposables.disposed();
        }
        Mono renewalMono = this.tracer.traceRenewMessageLock(this.renewMessageLock(lockToken).map(nextExpireAt -> {
            message.setLockedUntil((OffsetDateTime)nextExpireAt);
            return nextExpireAt;
        }), message);
        LockRenewalOperation recurringRenewal = new LockRenewalOperation(lockToken, maxRenewalDuration, false, __ -> renewalMono, initialExpireAt);
        try {
            this.renewalContainer.addOrUpdate(lockToken, OffsetDateTime.now().plus(maxRenewalDuration), recurringRenewal);
        }
        catch (Exception e) {
            LOGGER.atInfo().addKeyValue("lockToken", lockToken).log("Exception occurred while updating lockContainer.", new Object[]{e});
        }
        return Disposables.composite((Disposable[])new Disposable[]{() -> recurringRenewal.close(), () -> this.renewalContainer.remove(lockToken)});
    }
}

