package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpLink;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.ReactorReceiver;
import com.azure.core.amqp.implementation.ReactorSession;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/servicebus/implementation/ServiceBusReactorSession.class */
public class ServiceBusReactorSession extends ReactorSession implements ServiceBusSession {
    static final Symbol SESSION_FILTER = Symbol.getSymbol("com.microsoft:session-filter");
    static final Symbol LOCKED_UNTIL_UTC = Symbol.getSymbol("com.microsoft:locked-until-utc");
    private static final Symbol LINK_TIMEOUT_PROPERTY = Symbol.getSymbol("com.microsoft:timeout");
    private static final Symbol ENTITY_TYPE_PROPERTY = Symbol.getSymbol("com.microsoft:entity-type");
    private static final Symbol LINK_TRANSFER_DESTINATION_PROPERTY = Symbol.getSymbol("com.microsoft:transfer-destination-address");
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReactorSession.class);
    private final AmqpRetryPolicy retryPolicy;
    private final TokenManagerProvider tokenManagerProvider;
    private final Mono<ClaimsBasedSecurityNode> cbsNodeSupplier;
    private final AmqpConnection amqpConnection;
    private final AmqpRetryOptions retryOptions;
    private final boolean distributedTransactionsSupport;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusReactorSession(AmqpConnection amqpConnection, Session session, SessionHandler sessionHandler, String str, ReactorProvider reactorProvider, ReactorHandlerProvider reactorHandlerProvider, Mono<ClaimsBasedSecurityNode> mono, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, AmqpRetryOptions amqpRetryOptions, ServiceBusCreateSessionOptions serviceBusCreateSessionOptions) {
        super(amqpConnection, session, sessionHandler, str, reactorProvider, reactorHandlerProvider, mono, tokenManagerProvider, messageSerializer, amqpRetryOptions);
        this.amqpConnection = amqpConnection;
        this.retryOptions = amqpRetryOptions;
        this.retryPolicy = RetryUtil.getRetryPolicy(amqpRetryOptions);
        this.tokenManagerProvider = tokenManagerProvider;
        this.cbsNodeSupplier = mono;
        this.distributedTransactionsSupport = serviceBusCreateSessionOptions.isDistributedTransactionsSupported();
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusSession
    public Mono<ServiceBusReceiveLink> createConsumer(String str, String str2, MessagingEntityType messagingEntityType, Duration duration, AmqpRetryPolicy amqpRetryPolicy, ServiceBusReceiveMode serviceBusReceiveMode, String str3) {
        return createConsumer(str, str2, messagingEntityType, duration, amqpRetryPolicy, serviceBusReceiveMode, new HashMap(), str3);
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusSession
    public Mono<ServiceBusReceiveLink> createConsumer(String str, String str2, MessagingEntityType messagingEntityType, Duration duration, AmqpRetryPolicy amqpRetryPolicy, ServiceBusReceiveMode serviceBusReceiveMode, String str3, String str4) {
        HashMap hashMap = new HashMap();
        hashMap.put(SESSION_FILTER, str4);
        return createConsumer(str, str2, messagingEntityType, duration, amqpRetryPolicy, serviceBusReceiveMode, hashMap, str3);
    }

    @Override // com.azure.messaging.servicebus.implementation.ServiceBusSession
    public Mono<AmqpLink> createProducer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy, String str3, String str4) {
        Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        Objects.requireNonNull(duration, "'timeout' cannot be null.");
        Objects.requireNonNull(amqpRetryPolicy, "'retry' cannot be null.");
        Duration adjustServerTimeout = MessageUtils.adjustServerTimeout(duration);
        HashMap hashMap = new HashMap();
        hashMap.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(adjustServerTimeout.toMillis()));
        hashMap.put(AmqpConstants.CLIENT_IDENTIFIER, str4);
        if (CoreUtils.isNullOrEmpty(str3)) {
            LOGGER.atVerbose().addKeyValue("linkName", str).addKeyValue("entityPath", str2).log("Get or create sender link.");
            return createProducer(str, str2, duration, amqpRetryPolicy, hashMap);
        }
        hashMap.put(LINK_TRANSFER_DESTINATION_PROPERTY, str3);
        LOGGER.atVerbose().addKeyValue("linkName", str).addKeyValue("entityPath", str2).addKeyValue("transferEntityPath", str3).log("Get or create sender link.");
        TokenManager tokenManager = this.tokenManagerProvider.getTokenManager(this.cbsNodeSupplier, str3);
        return tokenManager.authorize().doFinally(signalType -> {
            tokenManager.close();
        }).then(createProducer(str, str2, duration, amqpRetryPolicy, hashMap));
    }

    public Mono<AmqpLink> createProducer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy) {
        return createProducer(str, str2, duration, amqpRetryPolicy, (Map) null);
    }

    protected Mono<AmqpLink> createProducer(String str, String str2, Duration duration, AmqpRetryPolicy amqpRetryPolicy, Map<Symbol, Object> map) {
        return this.distributedTransactionsSupport ? getOrCreateTransactionCoordinator().flatMap(amqpTransactionCoordinator -> {
            return super.createProducer(str, str2, duration, amqpRetryPolicy, map);
        }) : super.createProducer(str, str2, duration, amqpRetryPolicy, map);
    }

    protected ReactorReceiver createConsumer(String str, Receiver receiver, ReceiveLinkHandler receiveLinkHandler, TokenManager tokenManager, ReactorProvider reactorProvider) {
        return new ServiceBusReactorReceiver(this.amqpConnection, str, receiver, receiveLinkHandler, tokenManager, reactorProvider, this.retryOptions.getTryTimeout(), this.retryPolicy);
    }

    private Mono<ServiceBusReceiveLink> createConsumer(String str, String str2, MessagingEntityType messagingEntityType, Duration duration, AmqpRetryPolicy amqpRetryPolicy, ServiceBusReceiveMode serviceBusReceiveMode, Map<Symbol, Object> map, String str3) {
        SenderSettleMode senderSettleMode;
        ReceiverSettleMode receiverSettleMode;
        Objects.requireNonNull(str, "'linkName' cannot be null.");
        Objects.requireNonNull(str2, "'entityPath' cannot be null.");
        Objects.requireNonNull(duration, "'timeout' cannot be null.");
        Objects.requireNonNull(amqpRetryPolicy, "'retry' cannot be null.");
        Objects.requireNonNull(serviceBusReceiveMode, "'receiveMode' cannot be null.");
        HashMap hashMap = new HashMap();
        hashMap.put(LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(MessageUtils.adjustServerTimeout(duration).toMillis()));
        hashMap.put(AmqpConstants.CLIENT_RECEIVER_IDENTIFIER, str3);
        if (messagingEntityType != null) {
            hashMap.put(ENTITY_TYPE_PROPERTY, Integer.valueOf(messagingEntityType.getValue()));
        }
        switch (serviceBusReceiveMode) {
            case PEEK_LOCK:
                senderSettleMode = SenderSettleMode.UNSETTLED;
                receiverSettleMode = ReceiverSettleMode.SECOND;
                break;
            case RECEIVE_AND_DELETE:
                senderSettleMode = SenderSettleMode.SETTLED;
                receiverSettleMode = ReceiverSettleMode.FIRST;
                break;
            default:
                return Mono.error(new RuntimeException("ReceiveMode is not supported: " + serviceBusReceiveMode));
        }
        if (!this.distributedTransactionsSupport) {
            return createConsumer(str, str2, duration, amqpRetryPolicy, map, hashMap, null, senderSettleMode, receiverSettleMode).cast(ServiceBusReceiveLink.class);
        }
        SenderSettleMode senderSettleMode2 = senderSettleMode;
        ReceiverSettleMode receiverSettleMode2 = receiverSettleMode;
        return getOrCreateTransactionCoordinator().flatMap(amqpTransactionCoordinator -> {
            return createConsumer(str, str2, duration, amqpRetryPolicy, map, hashMap, null, senderSettleMode2, receiverSettleMode2).cast(ServiceBusReceiveLink.class);
        });
    }
}
