/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.ahc.ws;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.ahc.AhcEndpoint;
import org.apache.camel.component.ahc.ws.WsComponent;
import org.apache.camel.component.ahc.ws.WsConsumer;
import org.apache.camel.component.ahc.ws.WsProducer;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.ws.WebSocket;
import org.asynchttpclient.ws.WebSocketListener;
import org.asynchttpclient.ws.WebSocketUpgradeHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@UriEndpoint(firstVersion="2.14.0", scheme="ahc-ws,ahc-wss", extendsScheme="ahc,ahc", title="Async HTTP Client (AHC) Websocket,Async HTTP Client (AHC) Secure Websocket", syntax="ahc-ws:httpUri", category={Category.WEBSOCKET})
public class WsEndpoint
extends AhcEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(WsEndpoint.class);
    private final Set<WsConsumer> consumers = new HashSet<WsConsumer>();
    private final WsListener listener = new WsListener();
    private transient WebSocket websocket;
    @UriParam(label="producer")
    private boolean useStreaming;
    @UriParam(label="consumer")
    private boolean sendMessageOnError;

    public WsEndpoint(String endpointUri, WsComponent component) {
        super(endpointUri, component, null);
    }

    @Override
    public WsComponent getComponent() {
        return (WsComponent)super.getComponent();
    }

    @Override
    public Producer createProducer() throws Exception {
        return new WsProducer(this);
    }

    @Override
    public Consumer createConsumer(Processor processor) throws Exception {
        WsConsumer consumer = new WsConsumer(this, processor);
        this.configureConsumer(consumer);
        return consumer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    WebSocket getWebSocket() throws Exception {
        WsEndpoint wsEndpoint = this;
        synchronized (wsEndpoint) {
            this.reConnect();
        }
        return this.websocket;
    }

    void setWebSocket(WebSocket websocket) {
        this.websocket = websocket;
    }

    public boolean isUseStreaming() {
        return this.useStreaming;
    }

    public void setUseStreaming(boolean useStreaming) {
        this.useStreaming = useStreaming;
    }

    public boolean isSendMessageOnError() {
        return this.sendMessageOnError;
    }

    public void setSendMessageOnError(boolean sendMessageOnError) {
        this.sendMessageOnError = sendMessageOnError;
    }

    @Override
    protected AsyncHttpClient createClient(AsyncHttpClientConfig config) {
        DefaultAsyncHttpClient client;
        if (config == null) {
            config = new DefaultAsyncHttpClientConfig.Builder().build();
            client = new DefaultAsyncHttpClient(config);
        } else {
            client = new DefaultAsyncHttpClient(config);
        }
        return client;
    }

    public void connect() throws ExecutionException, InterruptedException {
        String uri = this.getHttpUri().toASCIIString();
        LOG.debug("Connecting to {}", (Object)uri);
        this.websocket = (WebSocket)this.getClient().prepareGet(uri).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(this.listener).build()).get();
    }

    @Override
    protected void doStop() throws Exception {
        if (this.websocket != null && this.websocket.isOpen()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Disconnecting from {}", (Object)this.getHttpUri().toASCIIString());
            }
            this.websocket.removeWebSocketListener(this.listener);
            this.websocket.sendCloseFrame();
            this.websocket = null;
        }
        super.doStop();
    }

    void connect(WsConsumer wsConsumer) throws ExecutionException, InterruptedException {
        this.consumers.add(wsConsumer);
        this.reConnect();
    }

    void disconnect(WsConsumer wsConsumer) {
        this.consumers.remove(wsConsumer);
    }

    void reConnect() throws ExecutionException, InterruptedException {
        if (this.websocket == null || !this.websocket.isOpen()) {
            String uri = this.getHttpUri().toASCIIString();
            LOG.info("Reconnecting websocket: {}", (Object)uri);
            this.connect();
        }
    }

    class WsListener
    implements WebSocketListener {
        WsListener() {
        }

        @Override
        public void onOpen(WebSocket websocket) {
            LOG.debug("Websocket opened");
        }

        @Override
        public void onClose(WebSocket socket, int code, String reason) {
            block3: {
                LOG.debug("websocket closed - reconnecting");
                try {
                    if (WsEndpoint.this.websocket != null) {
                        WsEndpoint.this.websocket.removeWebSocketListener(WsEndpoint.this.listener);
                        WsEndpoint.this.websocket.sendCloseFrame();
                        WsEndpoint.this.websocket = null;
                    }
                    WsEndpoint.this.reConnect();
                }
                catch (InterruptedException | ExecutionException e) {
                    LOG.warn("Error re-connecting to websocket", (Throwable)e);
                    ExceptionHandler exceptionHandler = WsEndpoint.this.getExceptionHandler();
                    if (exceptionHandler == null) break block3;
                    exceptionHandler.handleException("Error re-connecting to websocket", e);
                }
            }
        }

        @Override
        public void onError(Throwable t) {
            LOG.debug("websocket on error", t);
            if (WsEndpoint.this.isSendMessageOnError()) {
                for (WsConsumer consumer : WsEndpoint.this.consumers) {
                    consumer.sendMessage(t);
                }
            }
        }

        @Override
        public void onBinaryFrame(byte[] message, boolean finalFragment, int rsv) {
            LOG.debug("Received message --> {}", (Object)message);
            for (WsConsumer consumer : WsEndpoint.this.consumers) {
                consumer.sendMessage(message);
            }
        }

        @Override
        public void onTextFrame(String message, boolean finalFragment, int rsv) {
            LOG.debug("Received message --> {}", (Object)message);
            for (WsConsumer consumer : WsEndpoint.this.consumers) {
                consumer.sendMessage(message);
            }
        }

        @Override
        public void onPingFrame(byte[] payload) {
            LOG.debug("Received ping --> {}", (Object)payload);
            WsEndpoint.this.websocket.sendPongFrame(payload);
        }
    }
}

