package org.apache.omid.tso;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.omid.NetworkUtils;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.proto.TSOProto;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/omid/tso/TestTSOChannelHandlerNetty.class */
public class TestTSOChannelHandlerNetty {
    private static final Logger LOG = LoggerFactory.getLogger(TestTSOChannelHandlerNetty.class);

    @Mock
    private RequestProcessor requestProcessor;

    @BeforeMethod
    public void beforeTestMethod() {
        MockitoAnnotations.initMocks(this);
    }

    private TSOChannelHandler getTSOChannelHandler(int i) {
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setPort(i);
        return new TSOChannelHandler(tSOServerConfig, this.requestProcessor, new NullMetricsProvider());
    }

    @Test(timeOut = 10000)
    public void testMainAPI() throws Exception {
        int freePort = NetworkUtils.getFreePort();
        TSOChannelHandler tSOChannelHandler = getTSOChannelHandler(freePort);
        try {
            Assert.assertNull(tSOChannelHandler.listeningChannel);
            Assert.assertNull(tSOChannelHandler.allChannels);
            tSOChannelHandler.reconnect();
            Assert.assertTrue(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 1);
            Assert.assertEquals(((InetSocketAddress) tSOChannelHandler.listeningChannel.localAddress()).getPort(), freePort);
            tSOChannelHandler.closeConnection();
            Assert.assertFalse(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 0);
            tSOChannelHandler.closeConnection();
            Assert.assertFalse(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 0);
            tSOChannelHandler.reconnect();
            Assert.assertTrue(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 1);
            tSOChannelHandler.reconnect();
            Assert.assertTrue(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 1);
            tSOChannelHandler.close();
            Assert.assertFalse(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 0);
            try {
                tSOChannelHandler.reconnect();
                Assert.fail("Can't reconnect after closing");
            } catch (Exception e) {
                Assert.assertFalse(tSOChannelHandler.listeningChannel.isOpen());
                Assert.assertEquals(tSOChannelHandler.allChannels.size(), 0);
            }
        } finally {
            if (tSOChannelHandler != null) {
                tSOChannelHandler.close();
            }
        }
    }

    @Test(timeOut = 10000)
    public void testNettyConnectionToTSOFromClient() throws Exception {
        int freePort = NetworkUtils.getFreePort();
        TSOChannelHandler tSOChannelHandler = getTSOChannelHandler(freePort);
        try {
            Bootstrap createNettyClientBootstrap = createNettyClientBootstrap();
            ChannelFuture connect = createNettyClientBootstrap.connect(new InetSocketAddress("localhost", freePort));
            do {
            } while (!connect.isDone());
            Assert.assertFalse(connect.isSuccess());
            tSOChannelHandler.reconnect();
            Assert.assertTrue(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 1);
            ChannelFuture connect2 = createNettyClientBootstrap.connect(new InetSocketAddress("localhost", freePort));
            do {
            } while (!connect2.isDone());
            Assert.assertTrue(connect2.isSuccess());
            Assert.assertTrue(connect2.channel().isActive());
            do {
            } while (tSOChannelHandler.allChannels.size() != 2);
            connect2.channel().close().await();
            do {
            } while (tSOChannelHandler.allChannels.size() != 1);
            ChannelFuture connect3 = createNettyClientBootstrap.connect(new InetSocketAddress("localhost", freePort));
            do {
            } while (!connect3.isDone());
            Assert.assertTrue(connect3.isSuccess());
            do {
            } while (tSOChannelHandler.allChannels.size() != 2);
            tSOChannelHandler.closeConnection();
            Assert.assertFalse(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 0);
            TimeUnit.SECONDS.sleep(1L);
            Assert.assertFalse(connect3.channel().isOpen());
            tSOChannelHandler.reconnect();
            Assert.assertTrue(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 1);
            ChannelFuture connect4 = createNettyClientBootstrap.connect(new InetSocketAddress("localhost", freePort));
            do {
            } while (!connect4.isDone());
            Assert.assertTrue(connect4.isSuccess());
            do {
            } while (tSOChannelHandler.allChannels.size() != 2);
            tSOChannelHandler.reconnect();
            Assert.assertTrue(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 1);
            TimeUnit.SECONDS.sleep(1L);
            Assert.assertFalse(connect4.channel().isOpen());
            tSOChannelHandler.close();
            Assert.assertFalse(tSOChannelHandler.listeningChannel.isOpen());
            Assert.assertEquals(tSOChannelHandler.allChannels.size(), 0);
            if (tSOChannelHandler != null) {
                tSOChannelHandler.close();
            }
        } catch (Throwable th) {
            if (tSOChannelHandler != null) {
                tSOChannelHandler.close();
            }
            throw th;
        }
    }

    @Test(timeOut = 10000)
    public void testNettyChannelWriting() throws Exception {
        int freePort = NetworkUtils.getFreePort();
        TSOChannelHandler tSOChannelHandler = getTSOChannelHandler(freePort);
        try {
            tSOChannelHandler.reconnect();
            ChannelFuture connect = createNettyClientBootstrap().connect(new InetSocketAddress("localhost", freePort));
            do {
            } while (!connect.isDone());
            Assert.assertTrue(connect.isSuccess());
            Assert.assertTrue(connect.channel().isActive());
            Channel channel = connect.channel();
            do {
            } while (tSOChannelHandler.allChannels.size() != 2);
            TSOProto.HandshakeRequest.Builder newBuilder = TSOProto.HandshakeRequest.newBuilder();
            newBuilder.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
            connect.channel().writeAndFlush(TSOProto.Request.newBuilder().setHandshakeRequest(newBuilder.build()).build());
            testWritingTimestampRequest(channel);
            testWritingCommitRequest(channel);
            testWritingFenceRequest(channel);
            if (tSOChannelHandler != null) {
                tSOChannelHandler.close();
            }
        } catch (Throwable th) {
            if (tSOChannelHandler != null) {
                tSOChannelHandler.close();
            }
            throw th;
        }
    }

    private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
        Mockito.reset(new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder newBuilder = TSOProto.Request.newBuilder();
        newBuilder.setTimestampRequest(TSOProto.TimestampRequest.newBuilder().build());
        channel.writeAndFlush(newBuilder.build()).await();
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100L).times(1))).timestampRequest((Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100L).times(0))).commitRequest(Matchers.anyLong(), Matchers.anyCollectionOf(Long.class), Matchers.anyCollectionOf(Long.class), Matchers.anyBoolean(), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
    }

    private void testWritingCommitRequest(Channel channel) throws InterruptedException {
        Mockito.reset(new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder newBuilder = TSOProto.Request.newBuilder();
        TSOProto.CommitRequest.Builder newBuilder2 = TSOProto.CommitRequest.newBuilder();
        newBuilder2.setStartTimestamp(666L);
        newBuilder2.addCellId(666L);
        newBuilder.setCommitRequest(newBuilder2.build());
        Assert.assertTrue(newBuilder.build().hasCommitRequest());
        channel.writeAndFlush(newBuilder.build()).await();
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100L).times(0))).timestampRequest((Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100L).times(1))).commitRequest(Matchers.eq(666L), Matchers.anyCollectionOf(Long.class), Matchers.anyCollectionOf(Long.class), Matchers.eq(false), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
    }

    private void testWritingFenceRequest(Channel channel) throws InterruptedException {
        Mockito.reset(new RequestProcessor[]{this.requestProcessor});
        TSOProto.Request.Builder newBuilder = TSOProto.Request.newBuilder();
        TSOProto.FenceRequest.Builder newBuilder2 = TSOProto.FenceRequest.newBuilder();
        newBuilder2.setTableId(666L);
        newBuilder.setFenceRequest(newBuilder2.build());
        Assert.assertTrue(newBuilder.build().hasFenceRequest());
        channel.writeAndFlush(newBuilder.build()).await();
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100L).times(0))).timestampRequest((Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
        ((RequestProcessor) Mockito.verify(this.requestProcessor, Mockito.timeout(100L).times(1))).fenceRequest(Matchers.eq(666L), (Channel) Matchers.any(Channel.class), (MonitoringContext) Matchers.any(MonitoringContextImpl.class));
    }

    private Bootstrap createNettyClientBootstrap() {
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1, new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build());
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.option(ChannelOption.SO_REUSEADDR, true);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
        bootstrap.group(nioEventLoopGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: org.apache.omid.tso.TestTSOChannelHandlerNetty.1
            public void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 4));
                pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
                pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
                pipeline.addLast("protobufencoder", new ProtobufEncoder());
                pipeline.addLast("testhandler", new ChannelInboundHandlerAdapter() { // from class: org.apache.omid.tso.TestTSOChannelHandlerNetty.1.1
                    public void channelActive(ChannelHandlerContext channelHandlerContext) {
                        TestTSOChannelHandlerNetty.LOG.info("Channel {} active", channelHandlerContext.channel());
                    }

                    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
                        TestTSOChannelHandlerNetty.LOG.error("Error on channel {}", channelHandlerContext.channel(), th);
                    }
                });
            }
        });
        return bootstrap;
    }
}
