/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.implementation.AmqpLinkProvider;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.AzureTokenManagerProvider;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManager;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.amqp.models.CbsAuthorizationType;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.servicebus.implementation.ManagementChannel;
import com.azure.messaging.servicebus.implementation.MessagingEntityType;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpConnection;
import com.azure.messaging.servicebus.implementation.ServiceBusAmqpLinkProvider;
import com.azure.messaging.servicebus.implementation.ServiceBusCreateSessionOptions;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode;
import com.azure.messaging.servicebus.implementation.ServiceBusReactorSession;
import com.azure.messaging.servicebus.implementation.ServiceBusReceiveLink;
import com.azure.messaging.servicebus.implementation.ServiceBusSession;
import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

public class ServiceBusReactorAmqpConnection
extends ReactorConnection
implements ServiceBusAmqpConnection {
    private static final String MANAGEMENT_SESSION_NAME = "mgmt-session";
    private static final String MANAGEMENT_LINK_NAME = "mgmt";
    private static final String MANAGEMENT_ADDRESS = "$management";
    private static final String CROSS_ENTITY_TRANSACTIONS_LINK_NAME = "crossentity-coordinator";
    private static final ClientLogger LOGGER = new ClientLogger(ServiceBusReactorAmqpConnection.class);
    private final ConcurrentHashMap<String, ServiceBusManagementNode> managementNodes = new ConcurrentHashMap();
    private final String connectionId;
    private final ReactorProvider reactorProvider;
    private final ReactorHandlerProvider handlerProvider;
    private final ServiceBusAmqpLinkProvider linkProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final AmqpRetryOptions retryOptions;
    private final MessageSerializer messageSerializer;
    private final Scheduler scheduler;
    private final String fullyQualifiedNamespace;
    private final CbsAuthorizationType authorizationType;
    private final boolean distributedTransactionsSupport;
    private final boolean isV2;

    public ServiceBusReactorAmqpConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, ServiceBusAmqpLinkProvider linkProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, boolean distributedTransactionsSupport, boolean isV2) {
        super(connectionId, connectionOptions, reactorProvider, handlerProvider, (AmqpLinkProvider)linkProvider, tokenManagerProvider, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.FIRST, isV2);
        this.connectionId = connectionId;
        this.reactorProvider = reactorProvider;
        this.handlerProvider = handlerProvider;
        this.linkProvider = linkProvider;
        this.tokenManagerProvider = tokenManagerProvider;
        this.authorizationType = connectionOptions.getAuthorizationType();
        this.retryOptions = connectionOptions.getRetry();
        this.messageSerializer = messageSerializer;
        this.scheduler = connectionOptions.getScheduler();
        this.fullyQualifiedNamespace = connectionOptions.getFullyQualifiedNamespace();
        this.distributedTransactionsSupport = distributedTransactionsSupport;
        this.isV2 = isV2;
    }

    @Override
    public Mono<ServiceBusManagementNode> getManagementNode(String entityPath, MessagingEntityType entityType) {
        if (this.isDisposed()) {
            return FluxUtil.monoError((LoggingEventBuilder)LOGGER.atWarning(), (RuntimeException)new IllegalStateException(String.format("connectionId[%s]: Connection is disposed. Cannot get management instance for '%s'", this.connectionId, entityPath)));
        }
        String entityTypePath = String.join((CharSequence)"-", entityType.toString(), entityPath);
        ServiceBusManagementNode existing = this.managementNodes.get(entityTypePath);
        if (existing != null) {
            return Mono.just((Object)existing);
        }
        return this.getReactorConnection().then(Mono.defer(() -> {
            TokenManager tokenManager = new AzureTokenManagerProvider(this.authorizationType, this.fullyQualifiedNamespace, "https://servicebus.azure.net/.default").getTokenManager(this.getClaimsBasedSecurityNode(), entityPath);
            return tokenManager.authorize().thenReturn((Object)this.managementNodes.compute(entityTypePath, (key, current) -> {
                if (current != null) {
                    LOGGER.info("A management node exists already, returning it.");
                    tokenManager.close();
                    return current;
                }
                String sessionName = entityPath + "-" + MANAGEMENT_SESSION_NAME;
                String linkName = entityPath + "-" + MANAGEMENT_LINK_NAME;
                String address = entityPath + "/" + MANAGEMENT_ADDRESS;
                LOGGER.atInfo().addKeyValue("linkName", linkName).addKeyValue("entityPath", entityPath).addKeyValue("address", address).log("Creating management node.");
                return new ManagementChannel((Mono<RequestResponseChannel>)this.createRequestResponseChannel(sessionName, linkName, address), this.fullyQualifiedNamespace, entityPath, tokenManager, this.messageSerializer, this.retryOptions.getTryTimeout());
            }));
        }));
    }

    @Override
    public Mono<AmqpSendLink> createSendLink(String linkName, String entityPath, AmqpRetryOptions retryOptions, String transferEntityPath, String clientIdentifier) {
        return this.createSession(linkName).cast(ServiceBusSession.class).flatMap(session -> {
            LOGGER.atVerbose().addKeyValue("linkName", linkName).log("Get or create sender link.");
            AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)retryOptions);
            return session.createProducer(linkName + entityPath, entityPath, retryOptions.getTryTimeout(), retryPolicy, transferEntityPath, clientIdentifier).cast(AmqpSendLink.class);
        });
    }

    @Override
    public Mono<ServiceBusReceiveLink> createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType, String clientIdentifier) {
        return this.createSession(entityPath).cast(ServiceBusSession.class).flatMap(session -> {
            LOGGER.atVerbose().addKeyValue("entityPath", entityPath).log("Get or create consumer.");
            AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)this.retryOptions);
            return session.createConsumer(linkName, entityPath, entityType, this.retryOptions.getTryTimeout(), retryPolicy, receiveMode, clientIdentifier);
        });
    }

    public Mono<AmqpSession> createSession(String sessionName) {
        return super.createSession(this.distributedTransactionsSupport ? CROSS_ENTITY_TRANSACTIONS_LINK_NAME : sessionName);
    }

    @Override
    public Mono<ServiceBusReceiveLink> createReceiveLink(String linkName, String entityPath, ServiceBusReceiveMode receiveMode, String transferEntityPath, MessagingEntityType entityType, String clientIdentifier, String sessionId) {
        return this.createSession(entityPath).cast(ServiceBusSession.class).flatMap(session -> {
            LOGGER.atVerbose().addKeyValue("entityPath", entityPath).log("Get or create consumer.");
            AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy((AmqpRetryOptions)this.retryOptions);
            return session.createConsumer(linkName, entityPath, entityType, this.retryOptions.getTryTimeout(), retryPolicy, receiveMode, clientIdentifier, sessionId);
        });
    }

    protected AmqpSession createSession(String sessionName, Session session, SessionHandler handler) {
        return new ServiceBusReactorSession(this, session, handler, sessionName, this.reactorProvider, this.handlerProvider, this.linkProvider, (Mono<ClaimsBasedSecurityNode>)this.getClaimsBasedSecurityNode(), this.tokenManagerProvider, this.messageSerializer, this.retryOptions, new ServiceBusCreateSessionOptions(this.distributedTransactionsSupport), this.isV2);
    }
}

