/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.vertx.websocket;

import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.WebSocketBase;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.vertx.websocket.VertxWebsocketEndpoint;
import org.apache.camel.component.vertx.websocket.VertxWebsocketResultHandler;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VertxWebsocketProducer
extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(VertxWebsocketProducer.class);

    public VertxWebsocketProducer(VertxWebsocketEndpoint endpoint) {
        super(endpoint);
    }

    @Override
    public VertxWebsocketEndpoint getEndpoint() {
        return (VertxWebsocketEndpoint)super.getEndpoint();
    }

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        try {
            Message in = exchange.getIn();
            Object message = in.getBody();
            if (message == null) {
                callback.done(true);
                return true;
            }
            Map<String, WebSocketBase> connectedPeers = this.getConnectedPeers(exchange);
            VertxWebsocketResultHandler vertxWebsocketResultHandler = new VertxWebsocketResultHandler(exchange, callback, connectedPeers.keySet());
            if (connectedPeers.isEmpty()) {
                callback.done(true);
            }
            connectedPeers.forEach((connectionKey, webSocket) -> {
                Handler<AsyncResult<Void>> handler = result -> {
                    if (!result.succeeded()) {
                        vertxWebsocketResultHandler.onError((String)connectionKey, result.cause());
                    }
                    vertxWebsocketResultHandler.onResult((String)connectionKey);
                };
                if (webSocket != null) {
                    if (webSocket.isClosed()) {
                        LOG.warn("WebSocket peer connection with key {} is already closed", connectionKey);
                        vertxWebsocketResultHandler.onResult((String)connectionKey);
                    } else if (message instanceof String) {
                        webSocket.writeTextMessage((String)message, handler);
                    } else if (message instanceof byte[]) {
                        webSocket.writeBinaryMessage(Buffer.buffer((byte[])message), handler);
                    } else {
                        webSocket.writeTextMessage(in.getBody(String.class), handler);
                    }
                } else {
                    LOG.warn("No WebSocket peer connection found for connection key {}", connectionKey);
                    vertxWebsocketResultHandler.onResult((String)connectionKey);
                }
            });
            return false;
        }
        catch (Exception e) {
            exchange.setException(e);
            callback.done(true);
            return true;
        }
    }

    private Map<String, WebSocketBase> getConnectedPeers(Exchange exchange) throws Exception {
        VertxWebsocketEndpoint endpoint = this.getEndpoint();
        Map<String, ServerWebSocket> peers = endpoint.findPeersForHostPort();
        HashMap<String, WebSocketBase> connectedPeers = new HashMap<String, WebSocketBase>();
        Message message = exchange.getMessage();
        boolean isSendToAll = message.getHeader("CamelVertxWebsocket.sendToAll", endpoint.getConfiguration().isSendToAll(), Boolean.TYPE);
        if (isSendToAll) {
            if (ObjectHelper.isNotEmpty(peers)) {
                connectedPeers.putAll(peers);
            }
        } else {
            String connectionKey = message.getHeader("CamelVertxWebsocket.connectionKey", String.class);
            if (connectionKey != null && ObjectHelper.isNotEmpty(peers)) {
                Stream.of(connectionKey.split(",")).filter(peers::containsKey).forEach(key -> connectedPeers.put((String)key, endpoint.findPeerForConnectionKey((String)key)));
            } else {
                connectedPeers.put(UUID.randomUUID().toString(), endpoint.getWebSocket(exchange));
            }
        }
        return connectedPeers;
    }
}

