/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.salesforce.internal.streaming;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.CamelException;
import org.apache.camel.component.salesforce.SalesforceComponent;
import org.apache.camel.component.salesforce.SalesforceConsumer;
import org.apache.camel.component.salesforce.SalesforceHttpClient;
import org.apache.camel.component.salesforce.api.SalesforceException;
import org.apache.camel.component.salesforce.internal.SalesforceSession;
import org.apache.camel.support.ServiceSupport;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionHelper
extends ServiceSupport {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class);
    private static final int CONNECT_TIMEOUT = 110;
    private static final int CHANNEL_TIMEOUT = 40;
    private static final String FAILURE_FIELD = "failure";
    private static final String EXCEPTION_FIELD = "exception";
    private static final int DISCONNECT_INTERVAL = 5000;
    private final SalesforceComponent component;
    private final SalesforceSession session;
    private final BayeuxClient client;
    private final long timeout = 60000L;
    private final Map<SalesforceConsumer, ClientSessionChannel.MessageListener> listenerMap;
    private final long maxBackoff;
    private final long backoffIncrement;
    private ClientSessionChannel.MessageListener handshakeListener;
    private ClientSessionChannel.MessageListener connectListener;
    private ClientSessionChannel.MessageListener disconnectListener;
    private volatile String handshakeError;
    private volatile Exception handshakeException;
    private volatile String connectError;
    private volatile Exception connectException;
    private volatile boolean reconnecting;
    private final AtomicLong restartBackoff;

    public SubscriptionHelper(SalesforceComponent component) throws Exception {
        this.component = component;
        this.session = component.getSession();
        this.listenerMap = new ConcurrentHashMap<SalesforceConsumer, ClientSessionChannel.MessageListener>();
        this.client = this.createClient();
        this.restartBackoff = new AtomicLong(0L);
        this.backoffIncrement = component.getConfig().getBackoffIncrement();
        this.maxBackoff = component.getConfig().getMaxBackoff();
    }

    protected void doStart() throws Exception {
        this.handshakeError = null;
        this.handshakeException = null;
        this.connectError = null;
        this.connectException = null;
        if (this.handshakeListener == null) {
            this.handshakeListener = new ClientSessionChannel.MessageListener(){

                @Override
                public void onMessage(ClientSessionChannel channel, Message message) {
                    LOG.debug("[CHANNEL:META_HANDSHAKE]: {}", (Object)message);
                    if (!message.isSuccessful()) {
                        LOG.warn("Handshake failure: {}", (Object)message);
                        SubscriptionHelper.this.handshakeError = (String)message.get("error");
                        SubscriptionHelper.this.handshakeException = SubscriptionHelper.this.getFailure(message);
                        if (SubscriptionHelper.this.handshakeError != null && SubscriptionHelper.this.handshakeError.startsWith("401::")) {
                            try {
                                LOG.info("Refreshing OAuth token...");
                                SubscriptionHelper.this.session.login(SubscriptionHelper.this.session.getAccessToken());
                                LOG.info("Refreshed OAuth token for re-handshake");
                            }
                            catch (SalesforceException e) {
                                LOG.error("Error renewing OAuth token on 401 error: " + e.getMessage(), (Throwable)((Object)e));
                            }
                        }
                        SubscriptionHelper.this.restartClient();
                    } else if (!SubscriptionHelper.this.listenerMap.isEmpty()) {
                        SubscriptionHelper.this.reconnecting = true;
                    }
                }
            };
        }
        this.client.getChannel("/meta/handshake").addListener(this.handshakeListener);
        if (this.connectListener == null) {
            this.connectListener = new ClientSessionChannel.MessageListener(){

                @Override
                public void onMessage(ClientSessionChannel channel, Message message) {
                    LOG.debug("[CHANNEL:META_CONNECT]: {}", (Object)message);
                    if (!message.isSuccessful()) {
                        LOG.warn("Connect failure: {}", (Object)message);
                        SubscriptionHelper.this.connectError = (String)message.get("error");
                        SubscriptionHelper.this.connectException = SubscriptionHelper.this.getFailure(message);
                    } else if (SubscriptionHelper.this.reconnecting) {
                        SubscriptionHelper.this.reconnecting = false;
                        LOG.debug("Refreshing subscriptions to {} channels on reconnect", (Object)SubscriptionHelper.this.listenerMap.size());
                        HashMap map = new HashMap();
                        map.putAll(SubscriptionHelper.this.listenerMap);
                        SubscriptionHelper.this.listenerMap.clear();
                        for (Map.Entry entry : map.entrySet()) {
                            SalesforceConsumer consumer = (SalesforceConsumer)((Object)entry.getKey());
                            String topicName = consumer.getTopicName();
                            SubscriptionHelper.this.subscribe(topicName, consumer);
                        }
                    }
                }
            };
        }
        this.client.getChannel("/meta/connect").addListener(this.connectListener);
        if (this.disconnectListener == null) {
            this.disconnectListener = new ClientSessionChannel.MessageListener(){

                @Override
                public void onMessage(ClientSessionChannel clientSessionChannel, Message message) {
                    SubscriptionHelper.this.restartClient();
                }
            };
        }
        this.client.getChannel("/meta/disconnect").addListener(this.disconnectListener);
        this.client.handshake();
        long waitMs = TimeUnit.MILLISECONDS.convert(110L, TimeUnit.SECONDS);
        if (!this.client.waitFor(waitMs, BayeuxClient.State.CONNECTED, new BayeuxClient.State[0])) {
            if (this.handshakeException != null) {
                throw new CamelException(String.format("Exception during HANDSHAKE: %s", this.handshakeException.getMessage()), (Throwable)this.handshakeException);
            }
            if (this.handshakeError != null) {
                throw new CamelException(String.format("Error during HANDSHAKE: %s", this.handshakeError));
            }
            if (this.connectException != null) {
                throw new CamelException(String.format("Exception during CONNECT: %s", this.connectException.getMessage()), (Throwable)this.connectException);
            }
            if (this.connectError != null) {
                throw new CamelException(String.format("Error during CONNECT: %s", this.connectError));
            }
            throw new CamelException(String.format("Handshake request timeout after %s seconds", 110));
        }
    }

    private void restartClient() {
        SalesforceHttpClient httpClient = this.component.getConfig().getHttpClient();
        httpClient.getExecutor().execute(new Runnable(){

            @Override
            public void run() {
                LOG.info("Restarting on unexpected disconnect from Salesforce...");
                boolean abort = false;
                LOG.debug("Waiting to disconnect...");
                while (!SubscriptionHelper.this.client.isDisconnected()) {
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e) {
                        LOG.error("Aborting restart on interrupt!");
                        abort = true;
                    }
                }
                if (!abort) {
                    long backoff = SubscriptionHelper.this.restartBackoff.getAndAdd(SubscriptionHelper.this.backoffIncrement);
                    if (backoff > SubscriptionHelper.this.maxBackoff) {
                        LOG.error("Restart aborted after exceeding {} msecs backoff", (Object)SubscriptionHelper.this.maxBackoff);
                        abort = true;
                    } else {
                        LOG.debug("Pausing for {} msecs before restart attempt", (Object)backoff);
                        try {
                            Thread.sleep(backoff);
                        }
                        catch (InterruptedException e) {
                            LOG.error("Aborting restart on interrupt!");
                            abort = true;
                        }
                    }
                    if (!abort) {
                        Object lastError = new SalesforceException("Unknown error", null);
                        try {
                            SubscriptionHelper.this.doStop();
                            SubscriptionHelper.this.doStart();
                        }
                        catch (Exception e) {
                            LOG.error("Error restarting: " + e.getMessage(), (Throwable)e);
                            lastError = e;
                        }
                        if (SubscriptionHelper.this.client.isHandshook()) {
                            LOG.info("Successfully restarted!");
                            SubscriptionHelper.this.restartBackoff.set(SubscriptionHelper.this.client.getBackoffIncrement());
                        } else {
                            LOG.error("Failed to restart after pausing for {} msecs", (Object)backoff);
                            if (backoff + SubscriptionHelper.this.backoffIncrement > SubscriptionHelper.this.maxBackoff) {
                                String abortMsg = "Aborting restart attempt due to: " + ((Throwable)lastError).getMessage();
                                SalesforceException ex = new SalesforceException(abortMsg, (Throwable)lastError);
                                for (SalesforceConsumer consumer : SubscriptionHelper.this.listenerMap.keySet()) {
                                    consumer.handleException(abortMsg, (Throwable)((Object)ex));
                                }
                            }
                        }
                    }
                }
            }
        });
    }

    private Exception getFailure(Message message) {
        Exception exception = null;
        if (message.get(EXCEPTION_FIELD) != null) {
            exception = (Exception)message.get(EXCEPTION_FIELD);
        } else if (message.get(FAILURE_FIELD) != null) {
            exception = (Exception)((Map)message.get(FAILURE_FIELD)).get(EXCEPTION_FIELD);
        }
        return exception;
    }

    protected void doStop() throws Exception {
        this.client.getChannel("/meta/disconnect").removeListener(this.disconnectListener);
        this.client.getChannel("/meta/connect").removeListener(this.connectListener);
        this.client.getChannel("/meta/handshake").removeListener(this.handshakeListener);
        boolean disconnected = this.client.disconnect(60000L);
        if (!disconnected) {
            LOG.warn("Could not disconnect client connected to: {} after: {} msec.", (Object)this.getEndpointUrl(), (Object)60000L);
        }
    }

    private BayeuxClient createClient() throws Exception {
        SalesforceHttpClient httpClient = this.component.getConfig().getHttpClient();
        HashMap<String, Long> options = new HashMap<String, Long>();
        options.put("maxNetworkDelay", httpClient.getTimeout());
        if (this.session.getAccessToken() == null) {
            this.session.login(null);
        }
        LongPollingTransport transport = new LongPollingTransport(options, (HttpClient)httpClient){

            @Override
            protected void customize(Request request) {
                super.customize(request);
                request.getHeaders().put(HttpHeader.AUTHORIZATION, "OAuth " + SubscriptionHelper.this.session.getAccessToken());
            }
        };
        BayeuxClient client = new BayeuxClient(this.getEndpointUrl(), transport, new ClientTransport[0]);
        return client;
    }

    public void subscribe(final String topicName, final SalesforceConsumer consumer) {
        final String channelName = this.getChannelName(topicName);
        LOG.info("Subscribing to channel {}...", (Object)channelName);
        final ClientSessionChannel.MessageListener listener = new ClientSessionChannel.MessageListener(){

            @Override
            public void onMessage(ClientSessionChannel channel, Message message) {
                LOG.debug("Received Message: {}", (Object)message);
                consumer.processMessage(channel, message);
            }
        };
        ClientSessionChannel clientChannel = this.client.getChannel(channelName);
        ClientSessionChannel.MessageListener subscriptionListener = new ClientSessionChannel.MessageListener(){

            @Override
            public void onMessage(ClientSessionChannel channel, Message message) {
                LOG.debug("[CHANNEL:META_SUBSCRIBE]: {}", (Object)message);
                String subscribedChannelName = message.get("subscription").toString();
                if (channelName.equals(subscribedChannelName)) {
                    if (!message.isSuccessful()) {
                        String error = (String)message.get("error");
                        if (error == null) {
                            error = "Missing error message";
                        }
                        Exception failure = SubscriptionHelper.this.getFailure(message);
                        String msg = String.format("Error subscribing to %s: %s", topicName, failure != null ? failure.getMessage() : error);
                        consumer.handleException(msg, (Throwable)((Object)new SalesforceException(msg, failure)));
                    } else {
                        LOG.info("Subscribed to channel {}", (Object)subscribedChannelName);
                        SubscriptionHelper.this.listenerMap.put(consumer, listener);
                    }
                    SubscriptionHelper.this.client.getChannel("/meta/subscribe").removeListener(this);
                }
            }
        };
        this.client.getChannel("/meta/subscribe").addListener(subscriptionListener);
        clientChannel.subscribe(listener);
    }

    private String getChannelName(String topicName) {
        return "/topic/" + topicName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void unsubscribe(String topicName, SalesforceConsumer consumer) throws CamelException {
        block6: {
            final String channelName = this.getChannelName(topicName);
            final CountDownLatch latch = new CountDownLatch(1);
            final String[] unsubscribeError = new String[]{null};
            final Exception[] unsubscribeFailure = new Exception[]{null};
            ClientSessionChannel.MessageListener unsubscribeListener = new ClientSessionChannel.MessageListener(){

                @Override
                public void onMessage(ClientSessionChannel channel, Message message) {
                    String unsubscribedChannelName;
                    LOG.debug("[CHANNEL:META_UNSUBSCRIBE]: {}", (Object)message);
                    Object subscription = message.get("subscription");
                    if (subscription != null && channelName.equals(unsubscribedChannelName = subscription.toString())) {
                        if (!message.isSuccessful()) {
                            unsubscribeError[0] = (String)message.get("error");
                            unsubscribeFailure[0] = SubscriptionHelper.this.getFailure(message);
                        } else {
                            LOG.info("Unsubscribed from channel {}", (Object)unsubscribedChannelName);
                        }
                        latch.countDown();
                    }
                }
            };
            this.client.getChannel("/meta/unsubscribe").addListener(unsubscribeListener);
            try {
                ClientSessionChannel.MessageListener listener = this.listenerMap.remove((Object)consumer);
                if (listener == null) break block6;
                LOG.info("Unsubscribing from channel {}...", (Object)channelName);
                ClientSessionChannel clientChannel = this.client.getChannel(channelName);
                clientChannel.unsubscribe(listener);
                try {
                    if (!latch.await(40L, TimeUnit.SECONDS)) {
                        String message = unsubscribeFailure[0] != null ? String.format("Error unsubscribing from topic %s: %s", topicName, unsubscribeFailure[0].getMessage()) : (unsubscribeError[0] != null ? String.format("Error unsubscribing from topic %s: %s", topicName, unsubscribeError[0]) : String.format("Timeout error unsubscribing from topic %s after %s seconds", topicName, 40));
                        throw new CamelException(message, (Throwable)unsubscribeFailure[0]);
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            finally {
                this.client.getChannel("/meta/unsubscribe").removeListener(unsubscribeListener);
            }
        }
    }

    public String getEndpointUrl() {
        return this.component.getSession().getInstanceUrl() + "/cometd/" + this.component.getConfig().getApiVersion();
    }
}

