/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.messaging.tcp.reactor;

import io.netty5.buffer.Buffer;
import io.netty5.channel.ChannelHandlerContext;
import io.netty5.channel.group.ChannelGroup;
import io.netty5.channel.group.DefaultChannelGroup;
import io.netty5.handler.codec.ByteToMessageDecoder;
import io.netty5.util.concurrent.EventExecutor;
import io.netty5.util.concurrent.ImmediateEventExecutor;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.ReactorNetty2TcpConnection;
import org.springframework.messaging.tcp.reactor.TcpMessageCodec;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty5.DisposableChannel;
import reactor.netty5.NettyInbound;
import reactor.netty5.NettyOutbound;
import reactor.netty5.resources.ConnectionProvider;
import reactor.netty5.resources.LoopResources;
import reactor.netty5.tcp.TcpClient;
import reactor.util.retry.Retry;

public class ReactorNetty2TcpClient<P>
implements TcpOperations<P> {
    private static final int PUBLISH_ON_BUFFER_SIZE = 16;
    private final TcpClient tcpClient;
    private final TcpMessageCodec<P> codec;
    @Nullable
    private final ChannelGroup channelGroup;
    @Nullable
    private final LoopResources loopResources;
    @Nullable
    private final ConnectionProvider poolResources;
    private final Scheduler scheduler = Schedulers.newParallel((String)"tcp-client-scheduler");
    private Log logger = LogFactory.getLog(ReactorNetty2TcpClient.class);
    private volatile boolean stopping;

    public ReactorNetty2TcpClient(String host, int port, TcpMessageCodec<P> codec) {
        Assert.notNull((Object)host, "host is required");
        Assert.notNull(codec, "ReactorNettyCodec is required");
        this.channelGroup = new DefaultChannelGroup((EventExecutor)ImmediateEventExecutor.INSTANCE);
        this.loopResources = LoopResources.create((String)"tcp-client-loop");
        this.poolResources = ConnectionProvider.create((String)"tcp-client-pool", (int)10000);
        this.codec = codec;
        this.tcpClient = TcpClient.create((ConnectionProvider)this.poolResources).host(host).port(port).runOn(this.loopResources, false).doOnConnected(conn -> this.channelGroup.add((Object)conn.channel()));
    }

    public ReactorNetty2TcpClient(Function<TcpClient, TcpClient> clientConfigurer, TcpMessageCodec<P> codec) {
        Assert.notNull(codec, "ReactorNettyCodec is required");
        this.channelGroup = new DefaultChannelGroup((EventExecutor)ImmediateEventExecutor.INSTANCE);
        this.loopResources = LoopResources.create((String)"tcp-client-loop");
        this.poolResources = ConnectionProvider.create((String)"tcp-client-pool", (int)10000);
        this.codec = codec;
        this.tcpClient = clientConfigurer.apply(TcpClient.create((ConnectionProvider)this.poolResources).runOn(this.loopResources, false).doOnConnected(conn -> this.channelGroup.add((Object)conn.channel())));
    }

    public ReactorNetty2TcpClient(TcpClient tcpClient, TcpMessageCodec<P> codec) {
        Assert.notNull((Object)tcpClient, "TcpClient is required");
        Assert.notNull(codec, "ReactorNettyCodec is required");
        this.tcpClient = tcpClient;
        this.codec = codec;
        this.channelGroup = null;
        this.loopResources = null;
        this.poolResources = null;
    }

    public void setLogger(Log logger) {
        this.logger = logger;
    }

    public Log getLogger() {
        return this.logger;
    }

    @Override
    public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler) {
        Assert.notNull(handler, "TcpConnectionHandler is required");
        if (this.stopping) {
            return this.handleShuttingDownConnectFailure(handler);
        }
        return this.extendTcpClient(this.tcpClient, handler).handle((BiFunction)new ReactorNettyHandler(handler)).connect().doOnError(handler::afterConnectFailure).then().toFuture();
    }

    protected TcpClient extendTcpClient(TcpClient tcpClient, TcpConnectionHandler<P> handler) {
        return tcpClient;
    }

    @Override
    public CompletableFuture<Void> connectAsync(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
        Assert.notNull(handler, "TcpConnectionHandler is required");
        Assert.notNull((Object)strategy, "ReconnectStrategy is required");
        if (this.stopping) {
            return this.handleShuttingDownConnectFailure(handler);
        }
        CompletableFuture<Void> connectFuture = new CompletableFuture<Void>();
        this.extendTcpClient(this.tcpClient, handler).handle((BiFunction)new ReactorNettyHandler(handler)).connect().doOnNext(conn -> connectFuture.complete(null)).doOnError(connectFuture::completeExceptionally).doOnError(handler::afterConnectFailure).flatMap(DisposableChannel::onDispose).retryWhen(Retry.from(signals -> signals.map(retrySignal -> (int)retrySignal.totalRetriesInARow()).flatMap(attempt -> this.reconnect((Integer)attempt, strategy)))).repeatWhen(flux -> flux.scan((Object)1, (count, element) -> {
            Integer n = count;
            count = count + 1;
            return n;
        }).flatMap(attempt -> this.reconnect((Integer)attempt, strategy))).subscribe();
        return connectFuture;
    }

    private CompletableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler<P> handler) {
        IllegalStateException ex = new IllegalStateException("Shutting down.");
        handler.afterConnectFailure(ex);
        return Mono.error((Throwable)ex).toFuture();
    }

    private Publisher<? extends Long> reconnect(Integer attempt, ReconnectStrategy reconnectStrategy) {
        Long time = reconnectStrategy.getTimeToNextAttempt(attempt);
        return time != null ? Mono.delay((Duration)Duration.ofMillis(time), (Scheduler)this.scheduler) : Mono.empty();
    }

    @Override
    public CompletableFuture<Void> shutdownAsync() {
        Mono result;
        if (this.stopping) {
            return CompletableFuture.completedFuture(null);
        }
        this.stopping = true;
        if (this.channelGroup != null) {
            Sinks.Empty channnelGroupCloseSink = Sinks.empty();
            this.channelGroup.close().addListener(future -> channnelGroupCloseSink.tryEmitEmpty());
            result = channnelGroupCloseSink.asMono();
            if (this.loopResources != null) {
                result = result.onErrorComplete().then(this.loopResources.disposeLater());
            }
            if (this.poolResources != null) {
                result = result.onErrorComplete().then(this.poolResources.disposeLater());
            }
            result = result.onErrorComplete().then(this.stopScheduler());
        } else {
            result = this.stopScheduler();
        }
        return result.toFuture();
    }

    private Mono<Void> stopScheduler() {
        return Mono.fromRunnable(() -> {
            this.scheduler.dispose();
            for (int i2 = 0; i2 < 20 && !this.scheduler.isDisposed(); ++i2) {
                try {
                    Thread.sleep(100L);
                    continue;
                }
                catch (Throwable ex) {
                    break;
                }
            }
        });
    }

    public String toString() {
        return "ReactorNetty2TcpClient[" + this.tcpClient + "]";
    }

    private class ReactorNettyHandler
    implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {
        private final TcpConnectionHandler<P> connectionHandler;

        ReactorNettyHandler(TcpConnectionHandler<P> handler) {
            this.connectionHandler = handler;
        }

        @Override
        public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
            inbound.withConnection(conn -> {
                if (ReactorNetty2TcpClient.this.logger.isDebugEnabled()) {
                    ReactorNetty2TcpClient.this.logger.debug("Connected to " + conn.address());
                }
            });
            Sinks.Empty completionSink = Sinks.empty();
            ReactorNetty2TcpConnection connection = new ReactorNetty2TcpConnection(inbound, outbound, ReactorNetty2TcpClient.this.codec, (Sinks.Empty<Void>)completionSink);
            ReactorNetty2TcpClient.this.scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
            inbound.withConnection(conn -> conn.addHandlerFirst(new StompMessageDecoder(ReactorNetty2TcpClient.this.codec)));
            inbound.receiveObject().cast(Message.class).publishOn(ReactorNetty2TcpClient.this.scheduler, 16).subscribe(this.connectionHandler::handleMessage, this.connectionHandler::handleFailure, this.connectionHandler::afterConnectionClosed);
            return completionSink.asMono();
        }
    }

    private static class StompMessageDecoder<P>
    extends ByteToMessageDecoder {
        private final TcpMessageCodec<P> codec;

        StompMessageDecoder(TcpMessageCodec<P> codec) {
            this.codec = codec;
        }

        protected void decode(ChannelHandlerContext ctx, Buffer buffer) throws Exception {
            ByteBuffer byteBuffer = ByteBuffer.allocate(buffer.readableBytes());
            buffer.readBytes(byteBuffer);
            byteBuffer.flip();
            List<Message<P>> messages = this.codec.decode(byteBuffer);
            for (Message<P> message : messages) {
                ctx.fireChannelRead(message);
            }
        }
    }
}

