/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.servicebus.primitives;

import com.microsoft.azure.servicebus.ClientSettings;
import com.microsoft.azure.servicebus.amqp.BaseLinkHandler;
import com.microsoft.azure.servicebus.amqp.ConnectionHandler;
import com.microsoft.azure.servicebus.amqp.DispatchHandler;
import com.microsoft.azure.servicebus.amqp.IAmqpConnection;
import com.microsoft.azure.servicebus.amqp.ProtonUtil;
import com.microsoft.azure.servicebus.amqp.ReactorDispatcher;
import com.microsoft.azure.servicebus.amqp.ReactorHandler;
import com.microsoft.azure.servicebus.primitives.AsyncUtil;
import com.microsoft.azure.servicebus.primitives.ClientEntity;
import com.microsoft.azure.servicebus.primitives.CommonRequestResponseOperations;
import com.microsoft.azure.servicebus.primitives.CommunicationException;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.microsoft.azure.servicebus.primitives.ExceptionUtil;
import com.microsoft.azure.servicebus.primitives.MessagingEntityType;
import com.microsoft.azure.servicebus.primitives.RequestResponseLink;
import com.microsoft.azure.servicebus.primitives.RequestResponseLinkcache;
import com.microsoft.azure.servicebus.primitives.RetryPolicy;
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
import com.microsoft.azure.servicebus.primitives.StringUtil;
import com.microsoft.azure.servicebus.primitives.TimeoutException;
import com.microsoft.azure.servicebus.primitives.Timer;
import com.microsoft.azure.servicebus.primitives.TimerType;
import com.microsoft.azure.servicebus.primitives.Util;
import com.microsoft.azure.servicebus.security.SecurityToken;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Extendable;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.engine.HandlerException;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.reactor.Reactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

public class MessagingFactory
extends ClientEntity
implements IAmqpConnection {
    private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(MessagingFactory.class);
    public static final ExecutorService INTERNAL_THREAD_POOL = Executors.newCachedThreadPool();
    private static final String REACTOR_THREAD_NAME_PREFIX = "ReactorThread";
    private static final int MAX_CBS_LINK_CREATION_ATTEMPTS = 3;
    private final String hostName;
    private final CompletableFuture<Void> connetionCloseFuture;
    private final ConnectionHandler connectionHandler;
    private final ReactorHandler reactorHandler;
    private final LinkedList<Link> registeredLinks;
    private final Object reactorLock;
    private final RequestResponseLinkcache managementLinksCache;
    private Reactor reactor;
    private ReactorDispatcher reactorScheduler;
    private Connection connection;
    private CompletableFuture<MessagingFactory> factoryOpenFuture;
    private CompletableFuture<Void> cbsLinkCreationFuture;
    private RequestResponseLink cbsLink;
    private int cbsLinkCreationAttempts = 0;
    private Throwable lastCBSLinkCreationException = null;
    private final ClientSettings clientSettings;
    private final URI namespaceEndpointUri;

    private MessagingFactory(URI namespaceEndpointUri, ClientSettings clientSettings) {
        super("MessagingFactory".concat(StringUtil.getShortRandomString()));
        this.namespaceEndpointUri = namespaceEndpointUri;
        this.clientSettings = clientSettings;
        this.hostName = namespaceEndpointUri.getHost();
        this.registeredLinks = new LinkedList();
        this.connetionCloseFuture = new CompletableFuture();
        this.reactorLock = new Object();
        this.connectionHandler = new ConnectionHandler(this);
        this.factoryOpenFuture = new CompletableFuture();
        this.cbsLinkCreationFuture = new CompletableFuture();
        this.managementLinksCache = new RequestResponseLinkcache(this);
        this.reactorHandler = new ReactorHandler(){

            @Override
            public void onReactorInit(Event e) {
                super.onReactorInit(e);
                Reactor r = e.getReactor();
                TRACE_LOGGER.info("Creating connection to host '{}:{}'", (Object)MessagingFactory.this.hostName, (Object)5671);
                MessagingFactory.this.connection = r.connectionToHost(MessagingFactory.this.hostName, 5671, (Handler)MessagingFactory.this.connectionHandler);
            }
        };
        Timer.register(this.getClientId());
    }

    String getHostName() {
        return this.hostName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Reactor getReactor() {
        Object object = this.reactorLock;
        synchronized (object) {
            return this.reactor;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ReactorDispatcher getReactorScheduler() {
        Object object = this.reactorLock;
        synchronized (object) {
            return this.reactorScheduler;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startReactor(ReactorHandler reactorHandler) throws IOException {
        TRACE_LOGGER.info("Creating and starting reactor");
        Reactor newReactor = ProtonUtil.reactor(reactorHandler);
        Object object = this.reactorLock;
        synchronized (object) {
            this.reactor = newReactor;
            this.reactorScheduler = new ReactorDispatcher(newReactor);
        }
        String reactorThreadName = REACTOR_THREAD_NAME_PREFIX + UUID.randomUUID().toString();
        Thread reactorThread = new Thread((Runnable)new RunReactor(), reactorThreadName);
        reactorThread.start();
        TRACE_LOGGER.info("Started reactor");
    }

    Connection getConnection() {
        if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) {
            TRACE_LOGGER.info("Creating connection to host '{}:{}'", (Object)this.hostName, (Object)5671);
            this.connection = this.getReactor().connectionToHost(this.hostName, 5671, (Handler)this.connectionHandler);
        }
        return this.connection;
    }

    public Duration getOperationTimeout() {
        return this.clientSettings.getOperationTimeout();
    }

    public RetryPolicy getRetryPolicy() {
        return this.clientSettings.getRetryPolicy();
    }

    public ClientSettings getClientSetttings() {
        return this.clientSettings;
    }

    public static CompletableFuture<MessagingFactory> createFromNamespaceNameAsyc(String sbNamespaceName, ClientSettings clientSettings) {
        return MessagingFactory.createFromNamespaceEndpointURIAsyc(Util.convertNamespaceToEndPointURI(sbNamespaceName), clientSettings);
    }

    public static CompletableFuture<MessagingFactory> createFromNamespaceEndpointURIAsyc(URI namespaceEndpointURI, ClientSettings clientSettings) {
        if (TRACE_LOGGER.isInfoEnabled()) {
            TRACE_LOGGER.info("Creating messaging factory from namespace endpoint uri '{}'", (Object)namespaceEndpointURI.toString());
        }
        MessagingFactory messagingFactory = new MessagingFactory(namespaceEndpointURI, clientSettings);
        try {
            messagingFactory.startReactor(messagingFactory.reactorHandler);
        }
        catch (IOException e) {
            Marker fatalMarker = MarkerFactory.getMarker((String)"FATAL");
            TRACE_LOGGER.error(fatalMarker, "Starting reactor failed", (Throwable)e);
            messagingFactory.factoryOpenFuture.completeExceptionally(e);
        }
        return messagingFactory.factoryOpenFuture;
    }

    public static MessagingFactory createFromNamespaceName(String sbNamespaceName, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
        return MessagingFactory.completeFuture(MessagingFactory.createFromNamespaceNameAsyc(sbNamespaceName, clientSettings));
    }

    public static MessagingFactory createFromNamespaceEndpointURI(URI namespaceEndpointURI, ClientSettings clientSettings) throws InterruptedException, ServiceBusException {
        return MessagingFactory.completeFuture(MessagingFactory.createFromNamespaceEndpointURIAsyc(namespaceEndpointURI, clientSettings));
    }

    public static CompletableFuture<MessagingFactory> createFromConnectionStringBuilderAsync(ConnectionStringBuilder builder) {
        if (TRACE_LOGGER.isInfoEnabled()) {
            TRACE_LOGGER.info("Creating messaging factory from connection string '{}'", (Object)builder.toLoggableString());
        }
        return MessagingFactory.createFromNamespaceEndpointURIAsyc(builder.getEndpoint(), Util.getClientSettingsFromConnectionStringBuilder(builder));
    }

    public static CompletableFuture<MessagingFactory> createFromConnectionStringAsync(String connectionString) {
        ConnectionStringBuilder builder = new ConnectionStringBuilder(connectionString);
        return MessagingFactory.createFromConnectionStringBuilderAsync(builder);
    }

    public static MessagingFactory createFromConnectionStringBuilder(ConnectionStringBuilder builder) throws InterruptedException, ExecutionException {
        return MessagingFactory.createFromConnectionStringBuilderAsync(builder).get();
    }

    public static MessagingFactory createFromConnectionString(String connectionString) throws InterruptedException, ExecutionException {
        return MessagingFactory.createFromConnectionStringAsync(connectionString).get();
    }

    @Override
    public void onConnectionOpen() {
        if (!this.factoryOpenFuture.isDone()) {
            TRACE_LOGGER.info("MessagingFactory opened.");
            AsyncUtil.completeFuture(this.factoryOpenFuture, this);
        }
        TRACE_LOGGER.info("Connection opened to host.");
        if (this.cbsLink == null) {
            this.createCBSLinkAsync();
        }
    }

    @Override
    public void onConnectionError(ErrorCondition error) {
        if (error != null && error.getCondition() != null) {
            TRACE_LOGGER.error("Connection error. '{}'", (Object)error);
        }
        if (!this.factoryOpenFuture.isDone()) {
            AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, ExceptionUtil.toException(error));
            this.setClosed();
        } else {
            this.closeConnection(error, null);
        }
        if (this.getIsClosingOrClosed() && !this.connetionCloseFuture.isDone()) {
            TRACE_LOGGER.info("Connection to host closed.");
            AsyncUtil.completeFuture(this.connetionCloseFuture, null);
            Timer.unregister(this.getClientId());
        }
    }

    private void onReactorError(Exception cause) {
        if (!this.factoryOpenFuture.isDone()) {
            TRACE_LOGGER.error("Reactor error occured", (Throwable)cause);
            AsyncUtil.completeFutureExceptionally(this.factoryOpenFuture, cause);
            this.setClosed();
        } else {
            if (this.getIsClosingOrClosed()) {
                return;
            }
            TRACE_LOGGER.warn("Reactor error occured", (Throwable)cause);
            try {
                this.startReactor(this.reactorHandler);
            }
            catch (IOException e) {
                Marker fatalMarker = MarkerFactory.getMarker((String)"FATAL");
                TRACE_LOGGER.error(fatalMarker, "Re-starting reactor failed with exception.", (Throwable)e);
                this.onReactorError(cause);
            }
            this.closeConnection(null, cause);
        }
    }

    private void closeConnection(ErrorCondition error, Exception cause) {
        Connection currentConnection = this.connection;
        if (currentConnection != null) {
            Link[] links = this.registeredLinks.toArray(new Link[0]);
            this.registeredLinks.clear();
            TRACE_LOGGER.debug("Closing all links on the connection. Number of links '{}'", (Object)links.length);
            for (Link link : links) {
                link.close();
            }
            TRACE_LOGGER.debug("Closed all links on the connection. Number of links '{}'", (Object)links.length);
            if (currentConnection.getLocalState() != EndpointState.CLOSED) {
                TRACE_LOGGER.info("Closing connection to host");
                currentConnection.close();
            }
            for (Link link : links) {
                Handler handler = BaseHandler.getHandler((Extendable)link);
                if (handler == null || !(handler instanceof BaseLinkHandler)) continue;
                BaseLinkHandler linkHandler = (BaseLinkHandler)handler;
                if (error != null) {
                    linkHandler.processOnClose(link, error);
                    continue;
                }
                linkHandler.processOnClose(link, cause);
            }
        }
    }

    @Override
    protected CompletableFuture<Void> onClose() {
        if (!this.getIsClosed()) {
            CompletableFuture<Object> cbsLinkCloseFuture;
            TRACE_LOGGER.info("Closing messaging factory");
            if (this.cbsLink == null) {
                cbsLinkCloseFuture = CompletableFuture.completedFuture(null);
            } else {
                TRACE_LOGGER.info("Closing CBS link");
                cbsLinkCloseFuture = this.cbsLink.closeAsync();
            }
            ((CompletableFuture)cbsLinkCloseFuture.thenRun(() -> this.managementLinksCache.freeAsync())).thenRun(() -> {
                if (this.cbsLinkCreationFuture != null && !this.cbsLinkCreationFuture.isDone()) {
                    this.cbsLinkCreationFuture.completeExceptionally(new Exception("Connection closed."));
                }
                if (this.connection != null && this.connection.getRemoteState() != EndpointState.CLOSED) {
                    try {
                        this.scheduleOnReactorThread(new DispatchHandler(){

                            @Override
                            public void onEvent() {
                                if (MessagingFactory.this.connection != null && MessagingFactory.this.connection.getLocalState() != EndpointState.CLOSED) {
                                    TRACE_LOGGER.info("Closing connection to host");
                                    MessagingFactory.this.connection.close();
                                }
                            }
                        });
                    }
                    catch (IOException e) {
                        this.connetionCloseFuture.completeExceptionally(e);
                    }
                    Timer.schedule(new Runnable(){

                        @Override
                        public void run() {
                            if (!MessagingFactory.this.connetionCloseFuture.isDone()) {
                                String errorMessage = "Closing MessagingFactory timed out.";
                                TRACE_LOGGER.warn(errorMessage);
                                AsyncUtil.completeFutureExceptionally(MessagingFactory.this.connetionCloseFuture, new TimeoutException(errorMessage));
                            }
                        }
                    }, this.clientSettings.getOperationTimeout(), TimerType.OneTimeRun);
                } else {
                    this.connetionCloseFuture.complete(null);
                    Timer.unregister(this.getClientId());
                }
            });
            return this.connetionCloseFuture;
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void registerForConnectionError(Link link) {
        if (link != null) {
            this.registeredLinks.add(link);
        }
    }

    @Override
    public void deregisterForConnectionError(Link link) {
        if (link != null) {
            this.registeredLinks.remove(link);
        }
    }

    void scheduleOnReactorThread(DispatchHandler handler) throws IOException {
        this.getReactorScheduler().invoke(handler);
    }

    void scheduleOnReactorThread(int delay, DispatchHandler handler) throws IOException {
        this.getReactorScheduler().invoke(delay, handler);
    }

    CompletableFuture<ScheduledFuture<?>> sendSecurityTokenAndSetRenewTimer(String sasTokenAudienceURI, boolean retryOnFailure, Runnable validityRenewer) {
        TRACE_LOGGER.debug("Sending token for {}", (Object)sasTokenAudienceURI);
        CompletableFuture<SecurityToken> tokenFuture = this.clientSettings.getTokenProvider().getSecurityTokenAsync(sasTokenAudienceURI);
        return tokenFuture.thenComposeAsync(t -> {
            SecurityToken generatedSecurityToken = t;
            CompletionStage sendTokenFuture = this.cbsLinkCreationFuture.thenComposeAsync(v -> CommonRequestResponseOperations.sendCBSTokenAsync(this.cbsLink, Util.adjustServerTimeout(this.clientSettings.getOperationTimeout()), generatedSecurityToken), (Executor)INTERNAL_THREAD_POOL);
            if (retryOnFailure) {
                return ((CompletableFuture)sendTokenFuture).handleAsync((v, sendTokenEx) -> {
                    if (sendTokenEx == null) {
                        TRACE_LOGGER.debug("Sent token for {}", (Object)sasTokenAudienceURI);
                        return MessagingFactory.scheduleRenewTimer(generatedSecurityToken.getValidUntil(), validityRenewer);
                    }
                    TRACE_LOGGER.warn("Sending CBS Token for {} failed.", (Object)sasTokenAudienceURI, sendTokenEx);
                    TRACE_LOGGER.info("Will retry sending CBS Token for {} after {} seconds.", (Object)sasTokenAudienceURI, (Object)5);
                    return Timer.schedule(validityRenewer, Duration.ofSeconds(5L), TimerType.OneTimeRun);
                }, (Executor)INTERNAL_THREAD_POOL);
            }
            return ((CompletableFuture)sendTokenFuture).thenApply(v -> {
                TRACE_LOGGER.debug("Sent token for {}", (Object)sasTokenAudienceURI);
                return MessagingFactory.scheduleRenewTimer(generatedSecurityToken.getValidUntil(), validityRenewer);
            });
        }, (Executor)INTERNAL_THREAD_POOL);
    }

    private static ScheduledFuture<?> scheduleRenewTimer(Instant currentTokenValidUntil, Runnable validityRenewer) {
        int renewInterval = Util.getTokenRenewIntervalInSeconds((int)Duration.between(Instant.now(), currentTokenValidUntil).getSeconds());
        return Timer.schedule(validityRenewer, Duration.ofSeconds(renewInterval), TimerType.OneTimeRun);
    }

    CompletableFuture<RequestResponseLink> obtainRequestResponseLinkAsync(String entityPath, MessagingEntityType entityType) {
        this.throwIfClosed(null);
        return this.managementLinksCache.obtainRequestResponseLinkAsync(entityPath, entityType);
    }

    void releaseRequestResponseLink(String entityPath) {
        if (!this.getIsClosed()) {
            this.managementLinksCache.releaseRequestResponseLink(entityPath);
        }
    }

    private CompletableFuture<Void> createCBSLinkAsync() {
        if (++this.cbsLinkCreationAttempts > 3) {
            Throwable completionEx = this.lastCBSLinkCreationException == null ? new Exception("CBS link creation failed multiple times.") : this.lastCBSLinkCreationException;
            this.cbsLinkCreationFuture.completeExceptionally(completionEx);
            return CompletableFuture.completedFuture(null);
        }
        String requestResponseLinkPath = RequestResponseLink.getCBSNodeLinkPath();
        TRACE_LOGGER.info("Creating CBS link to {}", (Object)requestResponseLinkPath);
        CompletionStage crateAndAssignRequestResponseLink = RequestResponseLink.createAsync(this, this.getClientId() + "-cbs", requestResponseLinkPath, null, null).handleAsync((cbsLink, ex) -> {
            if (ex == null) {
                TRACE_LOGGER.info("Created CBS link to {}", (Object)requestResponseLinkPath);
                this.cbsLink = cbsLink;
                this.cbsLinkCreationFuture.complete(null);
            } else {
                this.lastCBSLinkCreationException = ExceptionUtil.extractAsyncCompletionCause(ex);
                TRACE_LOGGER.warn("Creating CBS link to {} failed. Attempts '{}'", (Object)requestResponseLinkPath, (Object)this.cbsLinkCreationAttempts);
                this.createCBSLinkAsync();
            }
            return null;
        }, (Executor)INTERNAL_THREAD_POOL);
        return crateAndAssignRequestResponseLink;
    }

    private static <T> T completeFuture(CompletableFuture<T> future) throws InterruptedException, ServiceBusException {
        try {
            return future.get();
        }
        catch (InterruptedException ie) {
            throw ie;
        }
        catch (ExecutionException ee) {
            Throwable cause = ee.getCause();
            if (cause instanceof ServiceBusException) {
                throw (ServiceBusException)cause;
            }
            throw new ServiceBusException(true, cause);
        }
    }

    private class RunReactor
    implements Runnable {
        private final Reactor rctr;

        public RunReactor() {
            this.rctr = MessagingFactory.this.getReactor();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            TRACE_LOGGER.info("starting reactor instance.");
            try {
                this.rctr.setTimeout(3141L);
                this.rctr.start();
                boolean continuteProcessing = true;
                while (!Thread.interrupted() && continuteProcessing) {
                    if (MessagingFactory.this.getIsClosed()) {
                        TRACE_LOGGER.info("Gracefully releasing reactor thread as messaging factory is closed");
                        break;
                    }
                    continuteProcessing = this.rctr.process();
                }
                TRACE_LOGGER.info("Stopping reactor");
                this.rctr.stop();
            }
            catch (HandlerException handlerException) {
                Throwable cause = handlerException.getCause();
                if (cause == null) {
                    cause = handlerException;
                }
                TRACE_LOGGER.warn("UnHandled exception while processing events in reactor:", (Throwable)handlerException);
                String message = !StringUtil.isNullOrEmpty(cause.getMessage()) ? cause.getMessage() : (!StringUtil.isNullOrEmpty(handlerException.getMessage()) ? handlerException.getMessage() : "Reactor encountered unrecoverable error");
                ServiceBusException sbException = new ServiceBusException(true, String.format(Locale.US, "%s, %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()), cause);
                if (cause instanceof UnresolvedAddressException) {
                    sbException = new CommunicationException(String.format(Locale.US, "%s. This is usually caused by incorrect hostname or network configuration. Please check to see if namespace information is correct. %s", message, ExceptionUtil.getTrackingIDAndTimeToLog()), cause);
                }
                MessagingFactory.this.onReactorError(sbException);
            }
            finally {
                this.rctr.free();
            }
        }
    }
}

