/*
 * Decompiled with CFR 0.152.
 */
package com.azure.core.http.netty;

import com.azure.core.http.HttpClient;
import com.azure.core.http.HttpHeader;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.netty.implementation.NettyAsyncHttpBufferedResponse;
import com.azure.core.http.netty.implementation.NettyAsyncHttpResponse;
import com.azure.core.http.netty.implementation.NettyToAzureCoreHttpHeadersWrapper;
import com.azure.core.http.netty.implementation.ReadTimeoutHandler;
import com.azure.core.http.netty.implementation.ResponseTimeoutHandler;
import com.azure.core.http.netty.implementation.Utility;
import com.azure.core.http.netty.implementation.WriteTimeoutHandler;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.proxy.ProxyConnectException;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BiFunction;
import javax.net.ssl.SSLException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.NettyOutbound;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
import reactor.util.retry.Retry;

class NettyAsyncHttpClient
implements HttpClient {
    private static final String AZURE_EAGERLY_READ_RESPONSE = "azure-eagerly-read-response";
    private static final String AZURE_RESPONSE_TIMEOUT = "azure-response-timeout";
    final boolean disableBufferCopy;
    final long readTimeout;
    final long writeTimeout;
    final long responseTimeout;
    final reactor.netty.http.client.HttpClient nettyClient;

    NettyAsyncHttpClient(reactor.netty.http.client.HttpClient nettyClient, boolean disableBufferCopy, long readTimeout, long writeTimeout, long responseTimeout) {
        this.nettyClient = nettyClient;
        this.disableBufferCopy = disableBufferCopy;
        this.readTimeout = readTimeout;
        this.writeTimeout = writeTimeout;
        this.responseTimeout = responseTimeout;
    }

    @Override
    public Mono<HttpResponse> send(HttpRequest request) {
        return this.send(request, Context.NONE);
    }

    @Override
    public Mono<HttpResponse> send(HttpRequest request, Context context) {
        Objects.requireNonNull(request.getHttpMethod(), "'request.getHttpMethod()' cannot be null.");
        Objects.requireNonNull(request.getUrl(), "'request.getUrl()' cannot be null.");
        Objects.requireNonNull(request.getUrl().getProtocol(), "'request.getUrl().getProtocol()' cannot be null.");
        boolean effectiveEagerlyReadResponse = (Boolean)context.getData(AZURE_EAGERLY_READ_RESPONSE).orElse(false);
        long effectiveResponseTimeout = context.getData(AZURE_RESPONSE_TIMEOUT).filter(timeoutDuration -> timeoutDuration instanceof Duration).map(timeoutDuration -> ((Duration)timeoutDuration).toMillis()).orElse(this.responseTimeout);
        return ((HttpClient.RequestSender)this.nettyClient.doOnRequest((r, connection) -> NettyAsyncHttpClient.addWriteTimeoutHandler(connection, this.writeTimeout)).doAfterRequest((r, connection) -> NettyAsyncHttpClient.addResponseTimeoutHandler(connection, effectiveResponseTimeout)).doOnResponse((response, connection) -> NettyAsyncHttpClient.addReadTimeoutHandler(connection, this.readTimeout)).doAfterResponseSuccess((response, connection) -> NettyAsyncHttpClient.removeReadTimeoutHandler(connection)).request(HttpMethod.valueOf(request.getHttpMethod().toString())).uri(request.getUrl().toString())).send(NettyAsyncHttpClient.bodySendDelegate(request)).responseConnection(NettyAsyncHttpClient.responseDelegate(request, this.disableBufferCopy, effectiveEagerlyReadResponse)).single().onErrorMap(throwable -> {
            if (throwable instanceof SSLException && throwable.getCause() instanceof ProxyConnectException) {
                return throwable.getCause();
            }
            return throwable;
        }).retryWhen(Retry.max(1L).filter(throwable -> throwable instanceof ProxyConnectException).onRetryExhaustedThrow((ignoredSpec, signal) -> signal.failure()));
    }

    private static BiFunction<HttpClientRequest, NettyOutbound, Publisher<Void>> bodySendDelegate(HttpRequest restRequest) {
        return (reactorNettyRequest, reactorNettyOutbound) -> {
            for (HttpHeader hdr : restRequest.getHeaders()) {
                if (reactorNettyRequest.requestHeaders().contains(hdr.getName())) {
                    boolean first = true;
                    for (String value2 : hdr.getValuesList()) {
                        if (first) {
                            first = false;
                            reactorNettyRequest.header(hdr.getName(), value2);
                            continue;
                        }
                        reactorNettyRequest.addHeader(hdr.getName(), value2);
                    }
                    continue;
                }
                hdr.getValuesList().forEach(value -> reactorNettyRequest.addHeader(hdr.getName(), (CharSequence)value));
            }
            if (restRequest.getBody() != null) {
                Flux<ByteBuf> nettyByteBufFlux = restRequest.getBody().map(Unpooled::wrappedBuffer);
                return reactorNettyOutbound.send(nettyByteBufFlux);
            }
            return reactorNettyOutbound;
        };
    }

    private static BiFunction<HttpClientResponse, Connection, Publisher<HttpResponse>> responseDelegate(HttpRequest restRequest, boolean disableBufferCopy, boolean eagerlyReadResponse) {
        return (reactorNettyResponse, reactorNettyConnection) -> {
            if (eagerlyReadResponse) {
                return FluxUtil.collectBytesFromNetworkResponse(reactorNettyConnection.inbound().receive().asByteBuffer(), new NettyToAzureCoreHttpHeadersWrapper(reactorNettyResponse.responseHeaders())).doFinally(ignored -> Utility.closeConnection(reactorNettyConnection)).map(bytes -> new NettyAsyncHttpBufferedResponse((HttpClientResponse)reactorNettyResponse, restRequest, (byte[])bytes));
            }
            return Mono.just(new NettyAsyncHttpResponse((HttpClientResponse)reactorNettyResponse, (Connection)reactorNettyConnection, restRequest, disableBufferCopy));
        };
    }

    private static void addWriteTimeoutHandler(Connection connection, long timeoutMillis) {
        connection.addHandlerLast("azureWriteTimeoutHandler", new WriteTimeoutHandler(timeoutMillis));
    }

    private static void addResponseTimeoutHandler(Connection connection, long timeoutMillis) {
        connection.removeHandler("azureWriteTimeoutHandler").addHandlerLast("azureResponseTimeoutHandler", new ResponseTimeoutHandler(timeoutMillis));
    }

    private static void addReadTimeoutHandler(Connection connection, long timeoutMillis) {
        connection.removeHandler("azureResponseTimeoutHandler").addHandlerLast("azureReadTimeoutHandler", new ReadTimeoutHandler(timeoutMillis));
    }

    private static void removeReadTimeoutHandler(Connection connection) {
        connection.removeHandler("azureReadTimeoutHandler");
    }
}

