/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.jms.transports.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestEncoder;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.transports.TransportOptions;
import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyWsTransport
extends NettyTcpTransport {
    private static final Logger LOG = LoggerFactory.getLogger(NettyWsTransport.class);
    private static final String AMQP_SUB_PROTOCOL = "amqp";
    private ScheduledFuture<?> handshakeTimeoutFuture;

    public NettyWsTransport(URI remoteLocation, TransportOptions options, boolean secure) {
        this(null, remoteLocation, options, secure);
    }

    public NettyWsTransport(TransportListener listener, URI remoteLocation, TransportOptions options, boolean secure) {
        super(listener, remoteLocation, options, secure);
    }

    @Override
    public void write(ByteBuf output) throws IOException {
        this.checkConnected();
        int length = output.readableBytes();
        if (length == 0) {
            return;
        }
        LOG.trace("Attempted write of: {} bytes", (Object)length);
        this.channel.write(new BinaryWebSocketFrame(output), this.channel.voidPromise());
    }

    @Override
    public void writeAndFlush(ByteBuf output) throws IOException {
        this.checkConnected();
        int length = output.readableBytes();
        if (length == 0) {
            return;
        }
        LOG.trace("Attempted write and flush of: {} bytes", (Object)length);
        this.channel.writeAndFlush(new BinaryWebSocketFrame(output), this.channel.voidPromise());
    }

    @Override
    protected ChannelInboundHandlerAdapter createChannelHandler() {
        return new NettyWebSocketTransportHandler();
    }

    @Override
    protected void addAdditionalHandlers(ChannelPipeline pipeline) {
        pipeline.addLast(new HttpResponseDecoder());
        pipeline.addLast(new HttpRequestEncoder());
        pipeline.addLast(new HttpObjectAggregator(8192));
    }

    @Override
    protected void handleConnected(Channel channel) throws Exception {
        LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", (Object)channel);
    }

    @Override
    protected void handleChannelInactive(Channel channel) throws Exception {
        try {
            if (this.handshakeTimeoutFuture != null) {
                this.handshakeTimeoutFuture.cancel(false);
            }
        }
        finally {
            super.handleChannelInactive(channel);
        }
    }

    private class NettyWebSocketTransportHandler
    extends NettyTcpTransport.NettyDefaultHandler<Object> {
        private final WebSocketClientHandshaker handshaker;

        public NettyWebSocketTransportHandler() {
            DefaultHttpHeaders headers = new DefaultHttpHeaders();
            NettyWsTransport.this.getTransportOptions().getHttpHeaders().forEach((key, value) -> headers.set((String)key, value));
            this.handshaker = WebSocketClientHandshakerFactory.newHandshaker(NettyWsTransport.this.getRemoteLocation(), WebSocketVersion.V13, NettyWsTransport.AMQP_SUB_PROTOCOL, true, headers, NettyWsTransport.this.getMaxFrameSize());
        }

        @Override
        public void channelActive(ChannelHandlerContext context) throws Exception {
            this.handshaker.handshake(context.channel());
            NettyWsTransport.this.handshakeTimeoutFuture = context.executor().schedule(() -> {
                LOG.trace("WebSocket handshake timed out! Channel is {}", (Object)context.channel());
                if (!this.handshaker.isHandshakeComplete()) {
                    NettyWsTransport.super.handleException(NettyWsTransport.this.channel, new IOException("WebSocket handshake timed out"));
                }
            }, (long)NettyWsTransport.this.getTransportOptions().getConnectTimeout(), TimeUnit.MILLISECONDS);
            super.channelActive(context);
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
            LOG.trace("New data read: incoming: {}", message);
            Channel ch = ctx.channel();
            if (!this.handshaker.isHandshakeComplete()) {
                this.handshaker.finishHandshake(ch, (FullHttpResponse)message);
                LOG.trace("WebSocket Client connected! {}", (Object)ctx.channel());
                if (NettyWsTransport.this.handshakeTimeoutFuture.cancel(false)) {
                    NettyWsTransport.super.handleConnected(ch);
                }
                return;
            }
            if (message instanceof FullHttpResponse) {
                FullHttpResponse response = (FullHttpResponse)message;
                throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ")");
            }
            WebSocketFrame frame = (WebSocketFrame)message;
            if (frame instanceof TextWebSocketFrame) {
                TextWebSocketFrame textFrame = (TextWebSocketFrame)frame;
                LOG.warn("WebSocket Client received message: " + textFrame.text());
                ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
            } else if (frame instanceof BinaryWebSocketFrame) {
                BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame)frame;
                LOG.trace("WebSocket Client received data: {} bytes", (Object)binaryFrame.content().readableBytes());
                NettyWsTransport.this.listener.onData(binaryFrame.content());
            } else if (frame instanceof ContinuationWebSocketFrame) {
                ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame)frame;
                LOG.trace("WebSocket Client received data continuation: {} bytes", (Object)continuationFrame.content().readableBytes());
                NettyWsTransport.this.listener.onData(continuationFrame.content());
            } else if (frame instanceof PingWebSocketFrame) {
                LOG.trace("WebSocket Client received ping, response with pong");
                ch.write(new PongWebSocketFrame(frame.content()));
            } else if (frame instanceof CloseWebSocketFrame) {
                LOG.trace("WebSocket Client received closing");
                ch.close();
            }
        }
    }
}

