/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollDatagramChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.ImmediateEventExecutor;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.component.netty.ClientInitializerFactory;
import org.apache.camel.component.netty.DefaultClientInitializerFactory;
import org.apache.camel.component.netty.DefaultNettyCamelStateCorrelationManager;
import org.apache.camel.component.netty.NettyCamelState;
import org.apache.camel.component.netty.NettyCamelStateCorrelationManager;
import org.apache.camel.component.netty.NettyConfiguration;
import org.apache.camel.component.netty.NettyEndpoint;
import org.apache.camel.component.netty.NettyHelper;
import org.apache.camel.component.netty.NettyPayloadHelper;
import org.apache.camel.component.netty.NettyWorkerPoolBuilder;
import org.apache.camel.component.netty.SharedSingletonObjectPool;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.IOHelper;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyProducer
extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(NettyProducer.class);
    private static final AttributeKey<NettyCamelStateCorrelationManager> CORRELATION_MANAGER_ATTR = AttributeKey.valueOf("NettyCamelStateCorrelationManager");
    private ChannelGroup allChannels;
    private CamelContext context;
    private NettyConfiguration configuration;
    private ClientInitializerFactory pipelineFactory;
    private CamelLogger noReplyLogger;
    private EventLoopGroup workerGroup;
    private volatile ObjectPool<ChannelFuture> pool;
    private NettyCamelStateCorrelationManager correlationManager;

    public NettyProducer(NettyEndpoint nettyEndpoint, NettyConfiguration configuration) {
        super(nettyEndpoint);
        this.configuration = configuration;
        this.context = this.getEndpoint().getCamelContext();
        this.noReplyLogger = new CamelLogger(LOG, configuration.getNoReplyLogLevel());
    }

    @Override
    public NettyEndpoint getEndpoint() {
        return (NettyEndpoint)super.getEndpoint();
    }

    public CamelContext getContext() {
        return this.context;
    }

    public NettyCamelStateCorrelationManager getCorrelationManager() {
        return this.correlationManager;
    }

    protected boolean isTcp() {
        return this.configuration.getProtocol().equalsIgnoreCase("tcp");
    }

    @Override
    protected void doStart() throws Exception {
        ClientInitializerFactory factory;
        if (this.configuration.isProducerPoolEnabled()) {
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            config.setMaxTotal(this.configuration.getProducerPoolMaxTotal());
            config.setMinIdle(this.configuration.getProducerPoolMinIdle());
            config.setMaxIdle(this.configuration.getProducerPoolMaxIdle());
            config.setTestOnBorrow(true);
            config.setTestWhileIdle(true);
            config.setTimeBetweenEvictionRuns(Duration.ofSeconds(30L));
            config.setMinEvictableIdleTime(Duration.ofMillis(this.configuration.getProducerPoolMinEvictableIdle()));
            this.pool = new GenericObjectPool<ChannelFuture>(new NettyProducerPoolableObjectFactory(this), config);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Created NettyProducer pool[maxTotal={}, minIdle={}, maxIdle={}, minEvictableIdleDuration={}] -> {}", new Object[]{config.getMaxTotal(), config.getMaxIdle(), config.getMaxIdle(), config.getMinEvictableIdleDuration(), this.pool});
            }
        } else {
            this.pool = new SharedSingletonObjectPool<ChannelFuture>(new NettyProducerPoolableObjectFactory(this));
            if (LOG.isDebugEnabled()) {
                LOG.debug("Created NettyProducer shared singleton pool -> {}", this.pool);
            }
        }
        if (this.configuration.getWorkerGroup() == null) {
            this.workerGroup = new NettyWorkerPoolBuilder().withNativeTransport(this.configuration.isNativeTransport()).withWorkerCount(this.configuration.getWorkerCount()).withName("NettyClientTCPWorker").build();
        }
        this.pipelineFactory = (factory = this.configuration.getClientInitializerFactory()) != null ? factory.createPipelineFactory(this) : new DefaultClientInitializerFactory(this);
        this.allChannels = this.configuration.getChannelGroup() == null ? new DefaultChannelGroup("NettyProducer", ImmediateEventExecutor.INSTANCE) : this.configuration.getChannelGroup();
        if (!this.configuration.isLazyChannelCreation()) {
            ChannelFuture channelFuture = this.pool.borrowObject();
            channelFuture.get();
            this.pool.returnObject(channelFuture);
        }
        this.correlationManager = this.configuration.getCorrelationManager() != null ? this.configuration.getCorrelationManager() : new DefaultNettyCamelStateCorrelationManager();
        if (this.correlationManager instanceof CamelContextAware) {
            ((CamelContextAware)((Object)this.correlationManager)).setCamelContext(this.getContext());
        }
        ServiceHelper.startService((Object)this.correlationManager);
        super.doStart();
    }

    @Override
    protected void doStop() throws Exception {
        LOG.debug("Stopping producer at address: {}", (Object)this.configuration.getAddress());
        if (this.pool != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Stopping producer with channel pool[active={}, idle={}]", (Object)this.pool.getNumActive(), (Object)this.pool.getNumIdle());
            }
            this.pool.close();
        }
        LOG.debug("Closing {} channels", (Object)this.allChannels.size());
        ChannelGroupFuture future = this.allChannels.close();
        future.awaitUninterruptibly();
        if (this.workerGroup != null) {
            LOG.debug("Stopping worker group: {}", (Object)this.workerGroup);
            this.workerGroup.shutdownGracefully();
            this.workerGroup = null;
        }
        LOG.trace("Stopping correlation manager: {}", (Object)this.correlationManager);
        ServiceHelper.stopService((Object)this.correlationManager);
        LOG.debug("Stopped producer at address: {}", (Object)this.configuration.getAddress());
        super.doStop();
    }

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        if (!this.isRunAllowed()) {
            if (exchange.getException() == null) {
                exchange.setException(new RejectedExecutionException());
            }
            callback.done(true);
            return true;
        }
        try {
            Object body = this.getRequestBody(exchange);
            if (body == null) {
                this.noReplyLogger.log("No payload to send for exchange: " + exchange);
                callback.done(true);
                return true;
            }
            return this.processWithBody(exchange, body, new BodyReleaseCallback(callback, body));
        }
        catch (Exception e) {
            exchange.setException(e);
            callback.done(true);
            return true;
        }
    }

    private boolean processWithBody(Exchange exchange, Object body, BodyReleaseCallback callback) {
        ChannelFuture channelFuture;
        if (this.getConfiguration().getCharsetName() != null) {
            exchange.setProperty(ExchangePropertyKey.CHARSET_NAME, (Object)IOHelper.normalizeCharset(this.getConfiguration().getCharsetName()));
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Pool[active={}, idle={}]", (Object)this.pool.getNumActive(), (Object)this.pool.getNumIdle());
        }
        ChannelOutboundInvoker channel = null;
        try {
            if (this.getConfiguration().isReuseChannel()) {
                channel = exchange.getProperty("CamelNettyChannel", Channel.class);
            }
            if (channel == null) {
                if (this.pool == null) {
                    throw new IllegalStateException("Producer pool is null");
                }
                channelFuture = this.pool.borrowObject();
                if (channelFuture != null) {
                    LOG.trace("Got channel request from pool {}", (Object)channelFuture);
                }
            } else {
                channelFuture = channel.newSucceededFuture();
            }
        }
        catch (Exception e) {
            exchange.setException(e);
            callback.done(true);
            return true;
        }
        if (channelFuture == null) {
            exchange.setException(new CamelExchangeException("Cannot get channel from pool", exchange));
            callback.done(true);
            return true;
        }
        channelFuture.addListener(new ChannelConnectedListener(exchange, callback, body));
        return false;
    }

    public void processWithConnectedChannel(final Exchange exchange, BodyReleaseCallback callback, final ChannelFuture channelFuture, Object body) {
        final Channel channel = channelFuture.channel();
        if (this.getConfiguration().isReuseChannel() && exchange.getProperty("CamelNettyChannel") == null) {
            channel.attr(CORRELATION_MANAGER_ATTR).set(this.correlationManager);
            exchange.setProperty("CamelNettyChannel", (Object)channel);
            exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter(){

                @Override
                public void onComplete(Exchange exchange) {
                    Boolean close = ExchangeHelper.isOutCapable(exchange) ? exchange.getOut().getHeader("CamelNettyCloseChannelWhenComplete", Boolean.class) : exchange.getIn().getHeader("CamelNettyCloseChannelWhenComplete", Boolean.class);
                    boolean disconnect = NettyProducer.this.getConfiguration().isDisconnect();
                    if (close != null) {
                        disconnect = close;
                    }
                    if (disconnect) {
                        LOG.trace("Closing channel {} as routing the Exchange is done", (Object)channel);
                        NettyHelper.close(channel);
                    }
                    NettyProducer.this.releaseChannel(channelFuture);
                }
            });
        }
        NettyCamelStateCorrelationManager channelCorrelationManager = Optional.ofNullable(channel.attr(CORRELATION_MANAGER_ATTR).get()).orElse(this.correlationManager);
        if (exchange.getIn().getHeader("CamelNettyRequestTimeout") != null) {
            long timeoutInMs = exchange.getIn().getHeader("CamelNettyRequestTimeout", Long.class);
            ChannelHandler oldHandler = channel.pipeline().get("timeout");
            ReadTimeoutHandler newHandler = new ReadTimeoutHandler(timeoutInMs, TimeUnit.MILLISECONDS);
            if (oldHandler == null) {
                channel.pipeline().addBefore("handler", "timeout", newHandler);
            } else {
                channel.pipeline().replace(oldHandler, "timeout", (ChannelHandler)newHandler);
            }
        }
        final AsyncCallback producerCallback = this.configuration.isReuseChannel() ? callback.getOriginalCallback() : new NettyProducerCallback(channelFuture, callback.getOriginalCallback());
        final NettyCamelState state = new NettyCamelState(producerCallback, exchange);
        channelCorrelationManager.putState(channel, state);
        InetSocketAddress remoteAddress = null;
        if (!this.isTcp()) {
            remoteAddress = new InetSocketAddress(this.configuration.getHost(), this.configuration.getPort());
        }
        NettyHelper.writeBodyAsync(LOG, channel, remoteAddress, body, exchange, new ChannelFutureListener(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                LOG.trace("Operation complete {}", (Object)channelFuture);
                if (!channelFuture.isSuccess()) {
                    Throwable cause = null;
                    try {
                        channelFuture.get(0L, TimeUnit.MILLISECONDS);
                    }
                    catch (Exception e) {
                        cause = e.getCause();
                    }
                    if (cause != null) {
                        exchange.setException(cause);
                    }
                    state.onExceptionCaughtOnce(false);
                    return;
                }
                if (!NettyProducer.this.configuration.isSync()) {
                    try {
                        Boolean close = ExchangeHelper.isOutCapable(exchange) ? exchange.getOut().getHeader("CamelNettyCloseChannelWhenComplete", Boolean.class) : exchange.getIn().getHeader("CamelNettyCloseChannelWhenComplete", Boolean.class);
                        boolean disconnect = NettyProducer.this.getConfiguration().isDisconnect();
                        if (close != null) {
                            disconnect = close;
                        }
                        if (!NettyProducer.this.configuration.isReuseChannel() && disconnect) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("Closing channel when complete at address: {}", (Object)NettyProducer.this.getEndpoint().getConfiguration().getAddress());
                            }
                            NettyHelper.close(channel);
                        }
                    }
                    finally {
                        producerCallback.done(false);
                    }
                }
            }
        });
    }

    protected Object getRequestBody(Exchange exchange) throws Exception {
        Object body = NettyPayloadHelper.getIn(this.getEndpoint(), exchange);
        if (body == null) {
            return null;
        }
        if (this.getConfiguration().isTextline()) {
            body = NettyHelper.getTextlineBody(body, exchange, this.getConfiguration().getDelimiter(), this.getConfiguration().isAutoAppendDelimiter());
        }
        return body;
    }

    protected EventLoopGroup getWorkerGroup() {
        EventLoopGroup wg = this.configuration.getWorkerGroup();
        if (wg == null) {
            wg = this.workerGroup;
        }
        return wg;
    }

    protected ChannelFuture openConnection() throws Exception {
        ChannelFuture answer;
        if (this.isTcp()) {
            Bootstrap clientBootstrap = new Bootstrap();
            if (this.configuration.isNativeTransport()) {
                clientBootstrap.channel(EpollSocketChannel.class);
            } else {
                clientBootstrap.channel(NioSocketChannel.class);
            }
            clientBootstrap.group(this.getWorkerGroup());
            clientBootstrap.option(ChannelOption.SO_KEEPALIVE, this.configuration.isKeepAlive());
            clientBootstrap.option(ChannelOption.TCP_NODELAY, this.configuration.isTcpNoDelay());
            clientBootstrap.option(ChannelOption.SO_REUSEADDR, this.configuration.isReuseAddress());
            clientBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.configuration.getConnectTimeout());
            clientBootstrap.handler(this.pipelineFactory);
            ChannelFuture answer2 = clientBootstrap.connect(new InetSocketAddress(this.configuration.getHost(), this.configuration.getPort()));
            if (LOG.isDebugEnabled()) {
                LOG.debug("Created new TCP client bootstrap connecting to {}:{} with options: {}", new Object[]{this.configuration.getHost(), this.configuration.getPort(), clientBootstrap});
            }
            return answer2;
        }
        Bootstrap connectionlessClientBootstrap = new Bootstrap();
        if (this.configuration.isNativeTransport()) {
            connectionlessClientBootstrap.channel(EpollDatagramChannel.class);
        } else {
            connectionlessClientBootstrap.channel(NioDatagramChannel.class);
        }
        connectionlessClientBootstrap.group(this.getWorkerGroup());
        connectionlessClientBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.configuration.getConnectTimeout());
        connectionlessClientBootstrap.option(ChannelOption.SO_BROADCAST, this.configuration.isBroadcast());
        connectionlessClientBootstrap.option(ChannelOption.SO_SNDBUF, this.configuration.getSendBufferSize());
        connectionlessClientBootstrap.option(ChannelOption.SO_RCVBUF, this.configuration.getReceiveBufferSize());
        connectionlessClientBootstrap.handler(this.pipelineFactory);
        if (!this.configuration.isUdpConnectionlessSending()) {
            answer = connectionlessClientBootstrap.connect(new InetSocketAddress(this.configuration.getHost(), this.configuration.getPort()));
        } else {
            answer = connectionlessClientBootstrap.bind(new InetSocketAddress(0)).sync();
            Channel channel = answer.channel();
            this.allChannels.add(channel);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Created new UDP client bootstrap connecting to {}:{} with options: {}", new Object[]{this.configuration.getHost(), this.configuration.getPort(), connectionlessClientBootstrap});
        }
        return answer;
    }

    protected void notifyChannelOpen(ChannelFuture channelFuture) throws Exception {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Channel open finished with {}", (Object)channelFuture);
        }
        if (channelFuture.isSuccess()) {
            Channel answer = channelFuture.channel();
            this.allChannels.add(answer);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Creating connector to address: {}", (Object)this.configuration.getAddress());
            }
        }
    }

    protected void releaseChannel(ChannelFuture channelFuture) {
        Channel channel = channelFuture.channel();
        try {
            if (channel.isActive()) {
                LOG.trace("Putting channel back to pool {}", (Object)channel);
                this.pool.returnObject(channelFuture);
            } else {
                LOG.trace("Invalidating channel from pool {}", (Object)channel);
                this.pool.invalidateObject(channelFuture);
            }
        }
        catch (Exception e) {
            LOG.warn("Error returning channel to pool {}. This exception will be ignored.", (Object)channel, (Object)e);
        }
    }

    public NettyConfiguration getConfiguration() {
        return this.configuration;
    }

    public void setConfiguration(NettyConfiguration configuration) {
        this.configuration = configuration;
    }

    public ChannelGroup getAllChannels() {
        return this.allChannels;
    }

    private static final class BodyReleaseCallback
    implements AsyncCallback {
        private volatile Object body;
        private final AsyncCallback originalCallback;

        private BodyReleaseCallback(AsyncCallback originalCallback, Object body) {
            this.body = body;
            this.originalCallback = originalCallback;
        }

        public AsyncCallback getOriginalCallback() {
            return this.originalCallback;
        }

        @Override
        public void done(boolean doneSync) {
            ReferenceCountUtil.release(this.body);
            this.originalCallback.done(doneSync);
        }
    }

    private class ChannelConnectedListener
    implements ChannelFutureListener {
        private final Exchange exchange;
        private final BodyReleaseCallback callback;
        private final Object body;

        ChannelConnectedListener(Exchange exchange, BodyReleaseCallback callback, Object body) {
            this.exchange = exchange;
            this.callback = callback;
            this.body = body;
        }

        @Override
        public void operationComplete(ChannelFuture future) {
            if (!future.isDone() || !future.isSuccess()) {
                ConnectException cause = new ConnectException("Cannot connect to " + NettyProducer.this.configuration.getAddress());
                if (future.cause() != null) {
                    cause.initCause(future.cause());
                }
                this.exchange.setException(cause);
                this.callback.done(false);
                NettyProducer.this.releaseChannel(future);
                return;
            }
            try {
                NettyProducer.this.processWithConnectedChannel(this.exchange, this.callback, future, this.body);
            }
            catch (Exception e) {
                this.exchange.setException(e);
                this.callback.done(false);
            }
        }
    }

    private final class NettyProducerPoolableObjectFactory
    implements PooledObjectFactory<ChannelFuture> {
        private NettyProducer producer;

        public NettyProducerPoolableObjectFactory(NettyProducer producer) {
            this.producer = producer;
        }

        @Override
        public void activateObject(PooledObject<ChannelFuture> p) throws Exception {
            ChannelFuture channelFuture = p.getObject();
            LOG.trace("activateObject channel request: {}", (Object)channelFuture);
            if (channelFuture.isSuccess() && this.producer.getConfiguration().getRequestTimeout() > 0L) {
                LOG.trace("reset the request timeout as we activate the channel");
                Channel channel = channelFuture.channel();
                ChannelHandler handler = channel.pipeline().get("timeout");
                if (handler == null) {
                    ReadTimeoutHandler timeout = new ReadTimeoutHandler(this.producer.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS);
                    channel.pipeline().addBefore("handler", "timeout", timeout);
                }
            }
        }

        @Override
        public void destroyObject(PooledObject<ChannelFuture> p) throws Exception {
            ChannelFuture channelFuture = p.getObject();
            LOG.trace("Destroying channel request: {}", (Object)channelFuture);
            channelFuture.addListener(new ChannelFutureListener(){

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Channel channel = future.channel();
                    if (channel.isOpen()) {
                        NettyHelper.close(channel);
                    }
                    NettyProducer.this.allChannels.remove(channel);
                }
            });
            channelFuture.cancel(false);
        }

        @Override
        public void passivateObject(PooledObject<ChannelFuture> p) throws Exception {
            ChannelFuture channelFuture = p.getObject();
            LOG.trace("passivateObject channel request: {}", (Object)channelFuture);
        }

        @Override
        public boolean validateObject(PooledObject<ChannelFuture> p) {
            ChannelFuture channelFuture = p.getObject();
            if (!channelFuture.isDone()) {
                LOG.trace("Validating connecting channel request: {} -> {}", (Object)channelFuture, (Object)true);
                return true;
            }
            if (!channelFuture.isSuccess()) {
                LOG.trace("Validating unsuccessful channel request: {} -> {}", (Object)channelFuture, (Object)false);
                return false;
            }
            Channel channel = channelFuture.channel();
            boolean answer = channel.isActive();
            LOG.trace("Validating channel: {} -> {}", (Object)channel, (Object)answer);
            return answer;
        }

        @Override
        public PooledObject<ChannelFuture> makeObject() throws Exception {
            ChannelFuture channelFuture = NettyProducer.this.openConnection().addListener(new ChannelFutureListener(){

                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    NettyProducer.this.notifyChannelOpen(future);
                }
            });
            LOG.trace("Requested channel: {}", (Object)channelFuture);
            return new DefaultPooledObject<ChannelFuture>(channelFuture);
        }
    }

    private final class NettyProducerCallback
    implements AsyncCallback {
        private final ChannelFuture channelFuture;
        private final AsyncCallback callback;

        private NettyProducerCallback(ChannelFuture channelFuture, AsyncCallback callback) {
            this.channelFuture = channelFuture;
            this.callback = callback;
        }

        @Override
        public void done(boolean doneSync) {
            try {
                NettyProducer.this.releaseChannel(this.channelFuture);
            }
            finally {
                this.callback.done(doneSync);
            }
        }
    }
}

