/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.ClientCnxnSocket;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.common.NettyUtils;
import org.apache.zookeeper.common.X509Exception;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientCnxnSocketNetty
extends ClientCnxnSocket {
    private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class);
    private final EventLoopGroup eventLoopGroup;
    private Channel channel;
    private CountDownLatch firstConnect;
    private ChannelFuture connectFuture;
    private final Lock connectLock = new ReentrantLock();
    private final AtomicBoolean disconnected = new AtomicBoolean();
    private final AtomicBoolean needSasl = new AtomicBoolean();
    private final Semaphore waitSasl = new Semaphore(0);
    private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR = new AtomicReference<Object>(null);
    private final GenericFutureListener<Future<Void>> onSendPktDoneListener = f -> {
        if (f.isSuccess()) {
            this.sentCount.getAndIncrement();
        }
    };

    ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
        this.clientConfig = clientConfig;
        this.eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(1);
        this.initProperties();
    }

    @Override
    boolean isConnected() {
        this.connectLock.lock();
        try {
            boolean bl = this.channel != null || this.connectFuture != null;
            return bl;
        }
        finally {
            this.connectLock.unlock();
        }
    }

    private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
        ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
        if (testAllocator != null) {
            return (Bootstrap)bootstrap.option(ChannelOption.ALLOCATOR, testAllocator);
        }
        return bootstrap;
    }

    @Override
    void connect(InetSocketAddress addr) throws IOException {
        this.firstConnect = new CountDownLatch(1);
        Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.eventLoopGroup)).channel(NettyUtils.nioOrEpollSocketChannel())).option(ChannelOption.SO_LINGER, -1)).option(ChannelOption.TCP_NODELAY, true)).handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
        bootstrap = this.configureBootstrapAllocator(bootstrap);
        bootstrap.validate();
        this.connectLock.lock();
        try {
            this.connectFuture = bootstrap.connect(addr);
            this.connectFuture.addListener(new ChannelFutureListener(){

                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    boolean connected = false;
                    ClientCnxnSocketNetty.this.connectLock.lock();
                    try {
                        if (!channelFuture.isSuccess()) {
                            LOG.warn("future isn't success.", channelFuture.cause());
                            return;
                        }
                        if (ClientCnxnSocketNetty.this.connectFuture == null) {
                            LOG.info("connect attempt cancelled");
                            channelFuture.channel().close();
                            return;
                        }
                        ClientCnxnSocketNetty.this.channel = channelFuture.channel();
                        ClientCnxnSocketNetty.this.disconnected.set(false);
                        ClientCnxnSocketNetty.this.initialized = false;
                        ClientCnxnSocketNetty.this.lenBuffer.clear();
                        ClientCnxnSocketNetty.this.incomingBuffer = ClientCnxnSocketNetty.this.lenBuffer;
                        ClientCnxnSocketNetty.this.sendThread.primeConnection();
                        ClientCnxnSocketNetty.this.updateNow();
                        ClientCnxnSocketNetty.this.updateLastSendAndHeard();
                        if (ClientCnxnSocketNetty.this.sendThread.tunnelAuthInProgress()) {
                            ClientCnxnSocketNetty.this.waitSasl.drainPermits();
                            ClientCnxnSocketNetty.this.needSasl.set(true);
                            ClientCnxnSocketNetty.this.sendPrimePacket();
                        } else {
                            ClientCnxnSocketNetty.this.needSasl.set(false);
                        }
                        connected = true;
                    }
                    finally {
                        ClientCnxnSocketNetty.this.connectFuture = null;
                        ClientCnxnSocketNetty.this.connectLock.unlock();
                        if (connected) {
                            LOG.info("channel is connected: {}", (Object)channelFuture.channel());
                        }
                        ClientCnxnSocketNetty.this.wakeupCnxn();
                        ClientCnxnSocketNetty.this.firstConnect.countDown();
                    }
                }
            });
        }
        finally {
            this.connectLock.unlock();
        }
    }

    @Override
    void cleanup() {
        this.connectLock.lock();
        try {
            if (this.connectFuture != null) {
                this.connectFuture.cancel(false);
                this.connectFuture = null;
            }
            if (this.channel != null) {
                this.channel.close().syncUninterruptibly();
                this.channel = null;
            }
        }
        finally {
            this.connectLock.unlock();
        }
        Iterator iter = this.outgoingQueue.iterator();
        while (iter.hasNext()) {
            ClientCnxn.Packet p = (ClientCnxn.Packet)iter.next();
            if (p != WakeupPacket.getInstance()) continue;
            iter.remove();
        }
    }

    @Override
    void close() {
        this.eventLoopGroup.shutdownGracefully();
    }

    @Override
    void saslCompleted() {
        this.needSasl.set(false);
        this.waitSasl.release();
    }

    @Override
    void connectionPrimed() {
    }

    @Override
    void packetAdded() {
    }

    @Override
    void onClosing() {
        this.firstConnect.countDown();
        this.wakeupCnxn();
        LOG.info("channel is told closing");
    }

    private void wakeupCnxn() {
        if (this.needSasl.get()) {
            this.waitSasl.release();
        }
        this.outgoingQueue.add(WakeupPacket.getInstance());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void doTransport(int waitTimeOut, Queue<ClientCnxn.Packet> pendingQueue, ClientCnxn cnxn) throws IOException, InterruptedException {
        try {
            if (!this.firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
                return;
            }
            ClientCnxn.Packet head = null;
            if (this.needSasl.get()) {
                if (!this.waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } else {
                head = (ClientCnxn.Packet)this.outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS);
            }
            if (!this.sendThread.getZkState().isAlive()) {
                this.addBack(head);
                return;
            }
            if (this.disconnected.get()) {
                this.addBack(head);
                throw new ClientCnxn.EndOfStreamException("channel for sessionid 0x" + Long.toHexString(this.sessionId) + " is lost");
            }
            if (head != null) {
                this.doWrite(pendingQueue, head, cnxn);
            }
        }
        finally {
            this.updateNow();
        }
    }

    private void addBack(ClientCnxn.Packet head) {
        if (head != null && head != WakeupPacket.getInstance()) {
            this.outgoingQueue.addFirst(head);
        }
    }

    private ChannelFuture sendPktAndFlush(ClientCnxn.Packet p) throws IOException {
        return this.sendPkt(p, true);
    }

    private ChannelFuture sendPktOnly(ClientCnxn.Packet p) throws IOException {
        return this.sendPkt(p, false);
    }

    private ChannelFuture sendPkt(ClientCnxn.Packet p, boolean doFlush) throws IOException {
        if (this.channel == null) {
            throw new IOException("channel has been closed");
        }
        p.createBB();
        this.updateLastSend();
        ByteBuf writeBuffer = Unpooled.wrappedBuffer(p.bb);
        ChannelFuture result = doFlush ? this.channel.writeAndFlush(writeBuffer) : this.channel.write(writeBuffer);
        result.addListener((GenericFutureListener<? extends Future<? super Void>>)this.onSendPktDoneListener);
        return result;
    }

    private void sendPrimePacket() throws IOException {
        this.sendPktAndFlush((ClientCnxn.Packet)this.outgoingQueue.remove());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doWrite(Queue<ClientCnxn.Packet> pendingQueue, ClientCnxn.Packet p, ClientCnxn cnxn) throws IOException {
        this.updateNow();
        boolean anyPacketsSent = false;
        while (true) {
            if (p != WakeupPacket.getInstance()) {
                if (p.requestHeader != null && p.requestHeader.getType() != 11 && p.requestHeader.getType() != 100) {
                    p.requestHeader.setXid(cnxn.getXid());
                    Queue<ClientCnxn.Packet> queue = pendingQueue;
                    synchronized (queue) {
                        pendingQueue.add(p);
                    }
                }
                this.sendPktOnly(p);
                anyPacketsSent = true;
            }
            if (this.outgoingQueue.isEmpty()) break;
            p = (ClientCnxn.Packet)this.outgoingQueue.remove();
        }
        if (anyPacketsSent) {
            this.channel.flush();
        }
    }

    @Override
    void sendPacket(ClientCnxn.Packet p) throws IOException {
        this.sendPktAndFlush(p);
    }

    @Override
    SocketAddress getRemoteSocketAddress() {
        Channel copiedChanRef = this.channel;
        return copiedChanRef == null ? null : copiedChanRef.remoteAddress();
    }

    @Override
    SocketAddress getLocalSocketAddress() {
        Channel copiedChanRef = this.channel;
        return copiedChanRef == null ? null : copiedChanRef.localAddress();
    }

    @Override
    void testableCloseSocket() throws IOException {
        Channel copiedChanRef = this.channel;
        if (copiedChanRef != null) {
            copiedChanRef.disconnect().awaitUninterruptibly();
        }
    }

    static void setTestAllocator(ByteBufAllocator allocator) {
        TEST_ALLOCATOR.set(allocator);
    }

    static void clearTestAllocator() {
        TEST_ALLOCATOR.set(null);
    }

    private class ZKClientHandler
    extends SimpleChannelInboundHandler<ByteBuf> {
        AtomicBoolean channelClosed = new AtomicBoolean(false);

        private ZKClientHandler() {
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            LOG.info("channel is disconnected: {}", (Object)ctx.channel());
            this.cleanup();
        }

        private void cleanup() {
            if (!this.channelClosed.compareAndSet(false, true)) {
                return;
            }
            ClientCnxnSocketNetty.this.disconnected.set(true);
            ClientCnxnSocketNetty.this.onClosing();
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
            ClientCnxnSocketNetty.this.updateNow();
            while (buf.isReadable()) {
                if (ClientCnxnSocketNetty.this.incomingBuffer.remaining() > buf.readableBytes()) {
                    int newLimit = ClientCnxnSocketNetty.this.incomingBuffer.position() + buf.readableBytes();
                    ClientCnxnSocketNetty.this.incomingBuffer.limit(newLimit);
                }
                buf.readBytes(ClientCnxnSocketNetty.this.incomingBuffer);
                ClientCnxnSocketNetty.this.incomingBuffer.limit(ClientCnxnSocketNetty.this.incomingBuffer.capacity());
                if (ClientCnxnSocketNetty.this.incomingBuffer.hasRemaining()) continue;
                ClientCnxnSocketNetty.this.incomingBuffer.flip();
                if (ClientCnxnSocketNetty.this.incomingBuffer == ClientCnxnSocketNetty.this.lenBuffer) {
                    ClientCnxnSocketNetty.this.recvCount.getAndIncrement();
                    ClientCnxnSocketNetty.this.readLength();
                    continue;
                }
                if (!ClientCnxnSocketNetty.this.initialized) {
                    ClientCnxnSocketNetty.this.readConnectResult();
                    ClientCnxnSocketNetty.this.lenBuffer.clear();
                    ClientCnxnSocketNetty.this.incomingBuffer = ClientCnxnSocketNetty.this.lenBuffer;
                    ClientCnxnSocketNetty.this.initialized = true;
                    ClientCnxnSocketNetty.this.updateLastHeard();
                    continue;
                }
                ClientCnxnSocketNetty.this.sendThread.readResponse(ClientCnxnSocketNetty.this.incomingBuffer);
                ClientCnxnSocketNetty.this.lenBuffer.clear();
                ClientCnxnSocketNetty.this.incomingBuffer = ClientCnxnSocketNetty.this.lenBuffer;
                ClientCnxnSocketNetty.this.updateLastHeard();
            }
            ClientCnxnSocketNetty.this.wakeupCnxn();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            LOG.error("Unexpected throwable", cause);
            this.cleanup();
        }
    }

    private class ZKClientPipelineFactory
    extends ChannelInitializer<SocketChannel> {
        private SSLContext sslContext = null;
        private SSLEngine sslEngine = null;
        private String host;
        private int port;

        public ZKClientPipelineFactory(String host, int port) {
            this.host = host;
            this.port = port;
        }

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            if (ClientCnxnSocketNetty.this.clientConfig.getBoolean("zookeeper.client.secure")) {
                this.initSSL(pipeline);
            }
            pipeline.addLast("handler", (ChannelHandler)new ZKClientHandler());
        }

        private synchronized void initSSL(ChannelPipeline pipeline) throws X509Exception.SSLContextException {
            if (this.sslContext == null || this.sslEngine == null) {
                try (ClientX509Util x509Util = new ClientX509Util();){
                    this.sslContext = x509Util.createSSLContext(ClientCnxnSocketNetty.this.clientConfig);
                    this.sslEngine = this.sslContext.createSSLEngine(this.host, this.port);
                    this.sslEngine.setUseClientMode(true);
                }
            }
            pipeline.addLast("ssl", (ChannelHandler)new SslHandler(this.sslEngine));
            LOG.info("SSL handler added for channel: {}", (Object)pipeline.channel());
        }
    }

    private static class WakeupPacket {
        private static final ClientCnxn.Packet instance = new ClientCnxn.Packet(null, null, null, null, null);

        protected WakeupPacket() {
        }

        public static ClientCnxn.Packet getInstance() {
            return instance;
        }
    }
}

