package org.apache.flink.runtime.rest;

import java.io.IOException;
import java.io.StringWriter;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
import org.apache.flink.runtime.rest.util.RestConstants;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufInputStream;
import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRequest;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.java8.FuturesConvertersImpl;

/* loaded from: input_file:org/apache/flink/runtime/rest/RestClient.class */
public class RestClient {
    private static final Logger LOG = LoggerFactory.getLogger(RestClient.class);
    private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
    private final Executor executor;
    private Bootstrap bootstrap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/RestClient$ClientHandler.class */
    public static class ClientHandler extends SimpleChannelInboundHandler<Object> {
        private final CompletableFuture<JsonResponse> jsonFuture;

        private ClientHandler() {
            this.jsonFuture = new CompletableFuture<>();
        }

        CompletableFuture<JsonResponse> getJsonFuture() {
            return this.jsonFuture;
        }

        @Override // org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) {
            if (obj instanceof FullHttpResponse) {
                readRawResponse((FullHttpResponse) obj);
            } else {
                RestClient.LOG.error("Implementation error: Received a response that wasn't a FullHttpResponse.");
                if (obj instanceof HttpResponse) {
                    this.jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse.", ((HttpResponse) obj).getStatus()));
                } else {
                    this.jsonFuture.completeExceptionally(new RestClientException("Implementation error: Received a response that wasn't a FullHttpResponse.", HttpResponseStatus.INTERNAL_SERVER_ERROR));
                }
            }
            channelHandlerContext.close();
        }

        private void readRawResponse(FullHttpResponse fullHttpResponse) {
            ByteBuf content = fullHttpResponse.content();
            try {
                JsonNode readTree = RestClient.objectMapper.readTree(new ByteBufInputStream(content));
                RestClient.LOG.debug("Received response {}.", readTree);
                this.jsonFuture.complete(new JsonResponse(readTree, fullHttpResponse.getStatus()));
            } catch (JsonParseException e) {
                RestClient.LOG.error("Response was not valid JSON.", e);
                content.readerIndex(0);
                try {
                    ByteBufInputStream byteBufInputStream = new ByteBufInputStream(content);
                    byte[] bArr = new byte[byteBufInputStream.available()];
                    byteBufInputStream.readFully(bArr);
                    String str = new String(bArr);
                    RestClient.LOG.error("Unexpected plain-text response: {}", str);
                    this.jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON, but plain-text: " + str, e, fullHttpResponse.getStatus()));
                } catch (IOException e2) {
                    this.jsonFuture.completeExceptionally(new RestClientException("Response was not valid JSON, nor plain-text.", e, fullHttpResponse.getStatus()));
                }
            } catch (IOException e3) {
                RestClient.LOG.error("Response could not be read.", e3);
                this.jsonFuture.completeExceptionally(new RestClientException("Response could not be read.", e3, fullHttpResponse.getStatus()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/rest/RestClient$JsonResponse.class */
    public static final class JsonResponse {
        private final JsonNode json;
        private final HttpResponseStatus httpResponseStatus;

        private JsonResponse(JsonNode jsonNode, HttpResponseStatus httpResponseStatus) {
            this.json = (JsonNode) Preconditions.checkNotNull(jsonNode);
            this.httpResponseStatus = (HttpResponseStatus) Preconditions.checkNotNull(httpResponseStatus);
        }

        public JsonNode getJson() {
            return this.json;
        }

        public HttpResponseStatus getHttpResponseStatus() {
            return this.httpResponseStatus;
        }
    }

    public RestClient(RestClientConfiguration restClientConfiguration, Executor executor) {
        Preconditions.checkNotNull(restClientConfiguration);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        final SSLEngine sslEngine = restClientConfiguration.getSslEngine();
        ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.rest.RestClient.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer
            public void initChannel(SocketChannel socketChannel) throws Exception {
                if (sslEngine != null) {
                    socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngine));
                }
                socketChannel.pipeline().addLast(new HttpClientCodec()).addLast(new HttpObjectAggregator(HadoopDataInputStream.MIN_SKIP_BYTES)).addLast(new ClientHandler()).addLast(new PipelineErrorHandler(RestClient.LOG));
            }
        };
        NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-client-netty"));
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(nioEventLoopGroup).channel(NioSocketChannel.class).handler(channelInitializer);
        LOG.info("Rest client endpoint started.");
    }

    public void shutdown(Time time) {
        LOG.info("Shutting down rest endpoint.");
        CompletableFuture completableFuture = new CompletableFuture();
        if (this.bootstrap != null && this.bootstrap.group() != null) {
            this.bootstrap.group().shutdownGracefully(0L, time.toMilliseconds(), TimeUnit.MILLISECONDS).addListener2(future -> {
                completableFuture.complete(null);
            });
        }
        try {
            completableFuture.get(time.toMilliseconds(), TimeUnit.MILLISECONDS);
            LOG.info("Rest endpoint shutdown complete.");
        } catch (Exception e) {
            LOG.warn("Rest endpoint shutdown failed.", e);
        }
    }

    public <M extends MessageHeaders<EmptyRequestBody, P, U>, U extends MessageParameters, P extends ResponseBody> CompletableFuture<P> sendRequest(String str, int i, M m, U u) throws IOException {
        return sendRequest(str, i, m, u, EmptyRequestBody.getInstance());
    }

    public <M extends MessageHeaders<R, P, EmptyMessageParameters>, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String str, int i, M m, R r) throws IOException {
        return sendRequest(str, i, m, EmptyMessageParameters.getInstance(), r);
    }

    public <M extends MessageHeaders<EmptyRequestBody, P, EmptyMessageParameters>, P extends ResponseBody> CompletableFuture<P> sendRequest(String str, int i, M m) throws IOException {
        return sendRequest(str, i, m, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance());
    }

    public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(String str, int i, M m, U u, R r) throws IOException {
        Preconditions.checkNotNull(str);
        Preconditions.checkArgument(0 <= i && i < 65536, "The target port " + i + " is not in the range (0, 65536].");
        Preconditions.checkNotNull(m);
        Preconditions.checkNotNull(r);
        Preconditions.checkNotNull(u);
        Preconditions.checkState(u.isResolved(), "Message parameters were not resolved.");
        String resolveUrl = MessageParameters.resolveUrl(m.getTargetRestEndpointURL(), u);
        LOG.debug("Sending request of class {} to {}", r.getClass(), resolveUrl);
        StringWriter stringWriter = new StringWriter();
        objectMapper.writeValue(stringWriter, r);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(stringWriter.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
        DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, m.getHttpMethod().getNettyHttpMethod(), resolveUrl, wrappedBuffer);
        defaultFullHttpRequest.headers().add("Content-Length", (Object) Integer.valueOf(wrappedBuffer.capacity())).add("Content-Type", (Object) RestConstants.REST_CONTENT_TYPE).set("Host", (Object) (str + ':' + i)).set("Connection", "close");
        return submitRequest(str, i, defaultFullHttpRequest, m.getResponseClass());
    }

    private <P extends ResponseBody> CompletableFuture<P> submitRequest(String str, int i, FullHttpRequest fullHttpRequest, Class<P> cls) {
        return CompletableFuture.supplyAsync(() -> {
            return this.bootstrap.connect(str, i);
        }, this.executor).thenApply(channelFuture -> {
            try {
                return channelFuture.sync2();
            } catch (InterruptedException e) {
                throw new FlinkRuntimeException(e);
            }
        }).thenApply((v0) -> {
            return v0.channel();
        }).thenCompose(channel -> {
            CompletableFuture<JsonResponse> jsonFuture = ((ClientHandler) channel.pipeline().get(ClientHandler.class)).getJsonFuture();
            channel.writeAndFlush(fullHttpRequest);
            return jsonFuture;
        }).thenComposeAsync(jsonResponse -> {
            return parseResponse(jsonResponse, cls);
        }, this.executor);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <P extends ResponseBody> CompletableFuture<P> parseResponse(JsonResponse jsonResponse, Class<P> cls) {
        FuturesConvertersImpl.CF cf = (CompletableFuture<P>) new CompletableFuture();
        try {
            cf.complete((ResponseBody) objectMapper.treeToValue(jsonResponse.getJson(), cls));
        } catch (JsonProcessingException e) {
            try {
                cf.completeExceptionally(new RestClientException(((ErrorResponseBody) objectMapper.treeToValue(jsonResponse.getJson(), ErrorResponseBody.class)).errors.toString(), jsonResponse.getHttpResponseStatus()));
            } catch (JsonProcessingException e2) {
                LOG.error("Received response was neither of the expected type ({}) nor an error. Response={}", new Object[]{cls, jsonResponse, e2});
                cf.completeExceptionally(new RestClientException("Response was neither of the expected type(" + cls + ") nor an error.", e2, jsonResponse.getHttpResponseStatus()));
            }
        }
        return cf;
    }
}
