/*
 * Decompiled with CFR 0.152.
 */
package mousio.etcd4j.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringEncoder;
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.resolver.dns.DefaultDnsServerAddressStreamProvider;
import io.netty.resolver.dns.DnsAddressResolverGroup;
import io.netty.resolver.dns.DnsServerAddressStreamProvider;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.concurrent.CancellationException;
import mousio.client.ConnectionState;
import mousio.client.retry.RetryHandler;
import mousio.etcd4j.EtcdSecurityContext;
import mousio.etcd4j.promises.EtcdResponsePromise;
import mousio.etcd4j.requests.EtcdRequest;
import mousio.etcd4j.transport.EtcdClientImpl;
import mousio.etcd4j.transport.EtcdNettyConfig;
import mousio.etcd4j.transport.EtcdResponseHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtcdNettyClient
implements EtcdClientImpl {
    private static final Logger logger = LoggerFactory.getLogger(EtcdNettyClient.class);
    private static final int DEFAULT_PORT = 2379;
    private static final String ENV_ETCD4J_ENDPOINT = "ETCD4J_ENDPOINT";
    private final EventLoopGroup eventLoopGroup;
    private final URI[] uris;
    private final Bootstrap bootstrap;
    private final EtcdNettyConfig config;
    private final EtcdSecurityContext securityContext;
    protected volatile int lastWorkingUriIndex;

    public EtcdNettyClient(SslContext sslContext, URI ... uri) {
        this(new EtcdNettyConfig(), sslContext, uri);
    }

    public EtcdNettyClient(EtcdSecurityContext securityContext, URI ... uri) {
        this(new EtcdNettyConfig(), securityContext, uri);
    }

    public EtcdNettyClient(EtcdNettyConfig config, SslContext sslContext, URI ... uris) {
        this(config, new EtcdSecurityContext(sslContext), uris);
    }

    public EtcdNettyClient(EtcdNettyConfig config, URI ... uris) {
        this(config, EtcdSecurityContext.NONE, uris);
    }

    public EtcdNettyClient(final EtcdNettyConfig config, final EtcdSecurityContext securityContext, URI ... uris) {
        logger.info("Setting up Etcd4j Netty client");
        this.lastWorkingUriIndex = 0;
        this.config = config.clone();
        this.securityContext = securityContext.clone();
        this.uris = uris;
        this.eventLoopGroup = config.getEventLoopGroup() == null ? new NioEventLoopGroup() : config.getEventLoopGroup();
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.eventLoopGroup)).channel(config.getSocketChannelClass())).option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)).option(ChannelOption.TCP_NODELAY, true)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout())).resolver(new DnsAddressResolverGroup(NioDatagramChannel.class, (DnsServerAddressStreamProvider)DefaultDnsServerAddressStreamProvider.INSTANCE)).handler(new ChannelInitializer<SocketChannel>(){

            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline p = ch.pipeline();
                if (securityContext.hasNettySsl()) {
                    p.addLast(securityContext.nettySslContext().newHandler(ch.alloc()));
                } else if (securityContext.hasSsl()) {
                    p.addLast(new SslHandler(securityContext.sslContext().createSSLEngine()));
                }
                p.addLast("codec", (ChannelHandler)new HttpClientCodec());
                p.addLast("auth", (ChannelHandler)new HttpBasicAuthHandler());
                p.addLast("chunkedWriter", (ChannelHandler)new ChunkedWriteHandler());
                p.addLast("aggregate", (ChannelHandler)new HttpObjectAggregator(config.getMaxFrameSize()));
            }
        });
    }

    protected Bootstrap getBootstrap() {
        return this.bootstrap;
    }

    @Override
    public <R> EtcdResponsePromise<R> send(final EtcdRequest<R> etcdRequest) throws IOException {
        ConnectionState connectionState = new ConnectionState(this.uris, this.lastWorkingUriIndex);
        if (etcdRequest.getPromise() == null) {
            etcdRequest.setPromise(new EtcdResponsePromise(etcdRequest.getRetryPolicy(), connectionState, new RetryHandler(){

                @Override
                public void doRetry(ConnectionState connectionState) throws IOException {
                    EtcdNettyClient.this.connect(etcdRequest, connectionState);
                }
            }));
        }
        this.connect(etcdRequest, connectionState);
        return etcdRequest.getPromise();
    }

    protected <R> void connect(EtcdRequest<R> etcdRequest) throws IOException {
        this.connect(etcdRequest, etcdRequest.getPromise().getConnectionState());
    }

    protected <R> void connect(final EtcdRequest<R> etcdRequest, final ConnectionState connectionState) throws IOException {
        URI uri;
        if (this.eventLoopGroup.isShuttingDown() || this.eventLoopGroup.isShutdown() || this.eventLoopGroup.isTerminated()) {
            etcdRequest.getPromise().getNettyPromise().cancel(true);
            logger.debug("Retry canceled because of closed etcd client");
            return;
        }
        URI requestUri = URI.create(etcdRequest.getUrl());
        if (requestUri.getHost() != null && requestUri.getPort() > -1) {
            uri = requestUri;
        } else if (connectionState.uris.length == 0 && System.getenv(ENV_ETCD4J_ENDPOINT) != null) {
            String endpoint_uri = System.getenv(ENV_ETCD4J_ENDPOINT);
            if (logger.isDebugEnabled()) {
                logger.debug("Will use environment variable {} as uri with value {}", (Object)ENV_ETCD4J_ENDPOINT, (Object)endpoint_uri);
            }
            uri = URI.create(endpoint_uri);
        } else {
            uri = connectionState.uris[connectionState.uriIndex];
        }
        ChannelFuture connectFuture = this.bootstrap.connect(this.connectAddress(uri));
        etcdRequest.getPromise().getConnectionState().loop = connectFuture.channel().eventLoop();
        etcdRequest.getPromise().attachNettyPromise(connectFuture.channel().eventLoop().newPromise());
        connectFuture.addListener((GenericFutureListener<? extends Future<? super Void>>)new GenericFutureListener<ChannelFuture>(){

            @Override
            public void operationComplete(final ChannelFuture f) throws Exception {
                if (!f.isSuccess()) {
                    Throwable cause = f.cause();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Connection failed to {}, cause {}", (Object)connectionState.uris[connectionState.uriIndex], (Object)cause);
                    }
                    if (cause instanceof ClosedChannelException || cause instanceof IllegalStateException) {
                        etcdRequest.getPromise().cancel(new CancellationException("Channel closed"));
                    } else {
                        etcdRequest.getPromise().handleRetry(f.cause());
                    }
                    return;
                }
                if (etcdRequest.getPromise().getNettyPromise().isCancelled()) {
                    f.channel().close();
                    etcdRequest.getPromise().getNettyPromise().setFailure(new CancellationException());
                    return;
                }
                final Promise listenedToPromise = etcdRequest.getPromise().getNettyPromise();
                listenedToPromise.addListener(new GenericFutureListener<Future<?>>(){

                    @Override
                    public void operationComplete(Future<?> future) throws Exception {
                        if (etcdRequest.getPromise().getNettyPromise() == listenedToPromise) {
                            f.channel().close();
                        }
                    }
                });
                if (logger.isDebugEnabled()) {
                    logger.debug("Connected to {} ({})", (Object)f.channel().remoteAddress().toString(), (Object)connectionState.uriIndex);
                }
                EtcdNettyClient.this.lastWorkingUriIndex = connectionState.uriIndex;
                EtcdNettyClient.this.modifyPipeLine(etcdRequest, f.channel().pipeline());
                EtcdNettyClient.this.createAndSendHttpRequest(uri, etcdRequest.getUrl(), etcdRequest, f.channel()).addListener(new ChannelFutureListener(){

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            etcdRequest.getPromise().setException(future.cause());
                            if (!f.channel().eventLoop().inEventLoop()) {
                                f.channel().eventLoop().shutdownGracefully();
                            }
                            f.channel().close();
                        }
                    }
                });
                f.channel().closeFuture().addListener(new ChannelFutureListener(){

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Connection closed for request {} on uri {} ", (Object)etcdRequest.getMethod().name(), (Object)etcdRequest.getUri());
                        }
                    }
                });
            }
        });
    }

    private <R> void modifyPipeLine(final EtcdRequest<R> req, ChannelPipeline pipeline) {
        final EtcdResponseHandler<R> handler = new EtcdResponseHandler<R>(this, req);
        if (req.hasTimeout()) {
            pipeline.addFirst(new ReadTimeoutHandler(req.getTimeout(), req.getTimeoutUnit()));
        }
        pipeline.addLast(handler);
        pipeline.addLast(new ChannelHandlerAdapter(){

            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                handler.retried(true);
                req.getPromise().handleRetry(cause);
            }
        });
    }

    private <R> ChannelFuture createAndSendHttpRequest(URI server, String uri, EtcdRequest<R> etcdRequest, Channel channel) throws Exception {
        HttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, etcdRequest.getMethod(), uri);
        httpRequest.headers().add((CharSequence)HttpHeaderNames.CONNECTION, (Object)"keep-alive");
        if (!this.config.hasHostName()) {
            httpRequest.headers().add((CharSequence)HttpHeaderNames.HOST, (Object)(server.getHost() + ":" + server.getPort()));
        } else {
            httpRequest.headers().add((CharSequence)HttpHeaderNames.HOST, (Object)this.config.getHostName());
        }
        HttpPostRequestEncoder bodyRequestEncoder = null;
        Map<String, String> keyValuePairs = etcdRequest.getRequestParams();
        if (keyValuePairs != null && !keyValuePairs.isEmpty()) {
            HttpMethod etcdRequestMethod = etcdRequest.getMethod();
            if (etcdRequestMethod == HttpMethod.POST || etcdRequestMethod == HttpMethod.PUT) {
                bodyRequestEncoder = new HttpPostRequestEncoder(httpRequest, false);
                for (Map.Entry<String, String> entry : keyValuePairs.entrySet()) {
                    bodyRequestEncoder.addBodyAttribute(entry.getKey(), entry.getValue());
                }
                httpRequest = bodyRequestEncoder.finalizeRequest();
            } else {
                QueryStringEncoder encoder = new QueryStringEncoder(uri);
                for (Map.Entry<String, String> entry : keyValuePairs.entrySet()) {
                    encoder.addParam(entry.getKey(), entry.getValue());
                }
                httpRequest.setUri(encoder.toString());
            }
        }
        etcdRequest.setHttpRequest(httpRequest);
        ChannelFuture future = channel.write(httpRequest);
        if (bodyRequestEncoder != null && bodyRequestEncoder.isChunked()) {
            future = channel.write(bodyRequestEncoder);
        }
        channel.flush();
        return future;
    }

    @Override
    public void close() {
        logger.info("Shutting down Etcd4j Netty client");
        if (this.config.isManagedEventLoopGroup()) {
            logger.debug("Shutting down Netty Loop");
            this.eventLoopGroup.shutdownGracefully();
        }
    }

    private InetSocketAddress connectAddress(URI uri) {
        return InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort() == -1 ? 2379 : uri.getPort());
    }

    private class HttpBasicAuthHandler
    extends ChannelOutboundHandlerAdapter {
        private HttpBasicAuthHandler() {
        }

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (EtcdNettyClient.this.securityContext.hasCredentials() && msg instanceof HttpRequest) {
                this.addBasicAuthHeader((HttpRequest)msg);
            }
            ctx.write(msg, promise);
        }

        private void addBasicAuthHeader(HttpRequest request) {
            String auth = Base64.encode(Unpooled.copiedBuffer(EtcdNettyClient.this.securityContext.username() + ":" + EtcdNettyClient.this.securityContext.password(), CharsetUtil.UTF_8)).toString(CharsetUtil.UTF_8);
            request.headers().add((CharSequence)HttpHeaderNames.AUTHORIZATION, (Object)("Basic " + auth));
        }
    }
}

