package org.apache.flink.runtime.rest;

import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.net.RedirectingSslHandler;
import org.apache.flink.runtime.net.SSLEngineFactory;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.rest.handler.router.RouterHandler;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpServerCodec;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/rest/RestServerEndpoint.class */
public abstract class RestServerEndpoint implements AutoCloseableAsync {
    private final String restAddress;
    private final String restBindAddress;
    private final int restBindPort;

    @Nullable
    private final SSLEngineFactory sslEngineFactory;
    private final int maxContentLength;
    protected final Path uploadDir;
    protected final Map<String, String> responseHeaders;
    private final CompletableFuture<Void> terminationFuture;
    private ServerBootstrap bootstrap;
    private Channel serverChannel;
    private String restBaseUrl;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    private final Object lock = new Object();
    private State state = State.CREATED;

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestServerEndpoint$RestHandlerUrlComparator.class */
    public static final class RestHandlerUrlComparator implements Comparator<Tuple2<RestHandlerSpecification, ChannelInboundHandler>>, Serializable {
        private static final long serialVersionUID = 2388466767835547926L;
        private static final Comparator<String> CASE_INSENSITIVE_ORDER = new CaseInsensitiveOrderComparator();
        static final RestHandlerUrlComparator INSTANCE = new RestHandlerUrlComparator();

        /* loaded from: input_file:org/apache/flink/runtime/rest/RestServerEndpoint$RestHandlerUrlComparator$CaseInsensitiveOrderComparator.class */
        public static final class CaseInsensitiveOrderComparator implements Comparator<String>, Serializable {
            private static final long serialVersionUID = 8550835445193437027L;

            @Override // java.util.Comparator
            public int compare(String str, String str2) {
                char upperCase;
                char upperCase2;
                char lowerCase;
                char lowerCase2;
                int length = str.length();
                int length2 = str2.length();
                int min = Math.min(length, length2);
                for (int i = 0; i < min; i++) {
                    char charAt = str.charAt(i);
                    char charAt2 = str2.charAt(i);
                    if (charAt != charAt2 && (upperCase = Character.toUpperCase(charAt)) != (upperCase2 = Character.toUpperCase(charAt2)) && (lowerCase = Character.toLowerCase(upperCase)) != (lowerCase2 = Character.toLowerCase(upperCase2))) {
                        if (lowerCase == ':') {
                            return 1;
                        }
                        if (lowerCase2 == ':') {
                            return -1;
                        }
                        return lowerCase - lowerCase2;
                    }
                }
                return length - length2;
            }
        }

        @Override // java.util.Comparator
        public int compare(Tuple2<RestHandlerSpecification, ChannelInboundHandler> tuple2, Tuple2<RestHandlerSpecification, ChannelInboundHandler> tuple22) {
            return CASE_INSENSITIVE_ORDER.compare(tuple2.f0.getTargetRestEndpointURL(), tuple22.f0.getTargetRestEndpointURL());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/rest/RestServerEndpoint$State.class */
    private enum State {
        CREATED,
        RUNNING,
        SHUTDOWN
    }

    public RestServerEndpoint(RestServerEndpointConfiguration restServerEndpointConfiguration) throws IOException {
        Preconditions.checkNotNull(restServerEndpointConfiguration);
        this.restAddress = restServerEndpointConfiguration.getRestAddress();
        this.restBindAddress = restServerEndpointConfiguration.getRestBindAddress();
        this.restBindPort = restServerEndpointConfiguration.getRestBindPort();
        this.sslEngineFactory = restServerEndpointConfiguration.getSslEngineFactory();
        this.uploadDir = restServerEndpointConfiguration.getUploadDir();
        createUploadDir(this.uploadDir, this.log);
        this.maxContentLength = restServerEndpointConfiguration.getMaxContentLength();
        this.responseHeaders = restServerEndpointConfiguration.getResponseHeaders();
        this.terminationFuture = new CompletableFuture<>();
    }

    protected abstract List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> completableFuture);

    /* JADX WARN: Type inference failed for: r1v21, types: [org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture] */
    public final void start() throws Exception {
        synchronized (this.lock) {
            Preconditions.checkState(this.state == State.CREATED, "The RestServerEndpoint cannot be restarted.");
            this.log.info("Starting rest endpoint.");
            final Router router = new Router();
            final CompletableFuture<String> completableFuture = new CompletableFuture<>();
            List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers = initializeHandlers(completableFuture);
            Collections.sort(initializeHandlers, RestHandlerUrlComparator.INSTANCE);
            initializeHandlers.forEach(tuple2 -> {
                this.log.debug("Register handler {} under {}@{}.", new Object[]{tuple2.f1, ((RestHandlerSpecification) tuple2.f0).getHttpMethod(), ((RestHandlerSpecification) tuple2.f0).getTargetRestEndpointURL()});
                registerHandler(router, tuple2);
            });
            ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.rest.RestServerEndpoint.1
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer
                public void initChannel(SocketChannel socketChannel) {
                    RouterHandler routerHandler = new RouterHandler(router, RestServerEndpoint.this.responseHeaders);
                    if (RestServerEndpoint.this.sslEngineFactory != null) {
                        socketChannel.pipeline().addLast("ssl", new RedirectingSslHandler(RestServerEndpoint.this.restAddress, completableFuture, RestServerEndpoint.this.sslEngineFactory));
                    }
                    socketChannel.pipeline().addLast(new HttpServerCodec()).addLast(new FileUploadHandler(RestServerEndpoint.this.uploadDir)).addLast(new FlinkHttpObjectAggregator(RestServerEndpoint.this.maxContentLength, RestServerEndpoint.this.responseHeaders)).addLast(new ChunkedWriteHandler()).addLast(routerHandler.getName(), routerHandler).addLast(new PipelineErrorHandler(RestServerEndpoint.this.log, RestServerEndpoint.this.responseHeaders));
                }
            };
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("flink-rest-server-netty-boss"));
            NioEventLoopGroup nioEventLoopGroup2 = new NioEventLoopGroup(0, new DefaultThreadFactory("flink-rest-server-netty-worker"));
            this.bootstrap = new ServerBootstrap();
            this.bootstrap.group(nioEventLoopGroup, nioEventLoopGroup2).channel(NioServerSocketChannel.class).childHandler(channelInitializer);
            this.log.debug("Binding rest endpoint to {}:{}.", this.restBindAddress, Integer.valueOf(this.restBindPort));
            this.serverChannel = (this.restBindAddress == null ? this.bootstrap.bind(this.restBindPort) : this.bootstrap.bind(this.restBindAddress, this.restBindPort)).syncUninterruptibly2().channel();
            InetSocketAddress inetSocketAddress = (InetSocketAddress) this.serverChannel.localAddress();
            String hostAddress = inetSocketAddress.getAddress().isAnyLocalAddress() ? this.restAddress : inetSocketAddress.getAddress().getHostAddress();
            int port = inetSocketAddress.getPort();
            this.log.info("Rest endpoint listening at {}:{}", hostAddress, Integer.valueOf(port));
            this.restBaseUrl = (this.sslEngineFactory != null ? WebAppUtils.HTTPS_PREFIX : WebAppUtils.HTTP_PREFIX) + hostAddress + ':' + port;
            completableFuture.complete(this.restBaseUrl);
            this.state = State.RUNNING;
            startInternal();
        }
    }

    protected abstract void startInternal() throws Exception;

    @Nullable
    public InetSocketAddress getServerAddress() {
        synchronized (this.lock) {
            Preconditions.checkState(this.state != State.CREATED, "The RestServerEndpoint has not been started yet.");
            Channel channel = this.serverChannel;
            if (channel != null) {
                try {
                    return (InetSocketAddress) channel.localAddress();
                } catch (Exception e) {
                    this.log.error("Cannot access local server address", e);
                }
            }
            return null;
        }
    }

    public String getRestBaseUrl() {
        String str;
        synchronized (this.lock) {
            Preconditions.checkState(this.state != State.CREATED, "The RestServerEndpoint has not been started yet.");
            str = this.restBaseUrl;
        }
        return str;
    }

    @Override // org.apache.flink.util.AutoCloseableAsync
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            this.log.info("Shutting down rest endpoint.");
            if (this.state == State.RUNNING) {
                shutDownInternal().whenComplete((r4, th) -> {
                    if (th != null) {
                        this.terminationFuture.completeExceptionally(th);
                    } else {
                        this.terminationFuture.complete(null);
                    }
                });
                this.state = State.SHUTDOWN;
            } else if (this.state == State.CREATED) {
                this.terminationFuture.complete(null);
                this.state = State.SHUTDOWN;
            }
            completableFuture = this.terminationFuture;
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> shutDownInternal() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            CompletableFuture completableFuture2 = new CompletableFuture();
            if (this.serverChannel != null) {
                this.serverChannel.close().addListener2(future -> {
                    if (future.isSuccess()) {
                        completableFuture2.complete(null);
                    } else {
                        completableFuture2.completeExceptionally(future.cause());
                    }
                });
                this.serverChannel = null;
            }
            completableFuture = new CompletableFuture<>();
            completableFuture2.thenRun(() -> {
                CompletableFuture completableFuture3 = new CompletableFuture();
                CompletableFuture completableFuture4 = new CompletableFuture();
                Time seconds = Time.seconds(10L);
                if (this.bootstrap != null) {
                    ?? config2 = this.bootstrap.config2();
                    EventLoopGroup group = config2.group();
                    if (group != null) {
                        group.shutdownGracefully(0L, seconds.toMilliseconds(), TimeUnit.MILLISECONDS).addListener2(future2 -> {
                            if (future2.isSuccess()) {
                                completableFuture3.complete(null);
                            } else {
                                completableFuture3.completeExceptionally(future2.cause());
                            }
                        });
                    } else {
                        completableFuture3.complete(null);
                    }
                    EventLoopGroup childGroup = config2.childGroup();
                    if (childGroup != null) {
                        childGroup.shutdownGracefully(0L, seconds.toMilliseconds(), TimeUnit.MILLISECONDS).addListener2(future3 -> {
                            if (future3.isSuccess()) {
                                completableFuture4.complete(null);
                            } else {
                                completableFuture4.completeExceptionally(future3.cause());
                            }
                        });
                    } else {
                        completableFuture4.complete(null);
                    }
                    this.bootstrap = null;
                } else {
                    completableFuture3.complete(null);
                    completableFuture4.complete(null);
                }
                FutureUtils.completeAll(Arrays.asList(completableFuture3, completableFuture4)).whenComplete((r4, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(null);
                    }
                });
            });
        }
        return completableFuture;
    }

    private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> tuple2) {
        switch (tuple2.f0.getHttpMethod()) {
            case GET:
                router.addGet(tuple2.f0.getTargetRestEndpointURL(), tuple2.f1);
                return;
            case POST:
                router.addPost(tuple2.f0.getTargetRestEndpointURL(), tuple2.f1);
                return;
            case DELETE:
                router.addDelete(tuple2.f0.getTargetRestEndpointURL(), tuple2.f1);
                return;
            case PATCH:
                router.addPatch(tuple2.f0.getTargetRestEndpointURL(), tuple2.f1);
                return;
            default:
                throw new RuntimeException("Unsupported http method: " + tuple2.f0.getHttpMethod() + '.');
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public static void createUploadDir(Path path, Logger logger) throws IOException {
        if (Files.exists(path, new LinkOption[0])) {
            return;
        }
        logger.warn("Upload directory {} does not exist, or has been deleted externally. Previously uploaded files are no longer available.", path);
        checkAndCreateUploadDir(path, logger);
    }

    private static synchronized void checkAndCreateUploadDir(Path path, Logger logger) throws IOException {
        if (Files.exists(path, new LinkOption[0]) && Files.isWritable(path)) {
            logger.info("Using directory {} for file uploads.", path);
        } else if (Files.isWritable(Files.createDirectories(path, new FileAttribute[0]))) {
            logger.info("Created directory {} for file uploads.", path);
        } else {
            logger.warn("Upload directory {} cannot be created or is not writable.", path);
            throw new IOException(String.format("Upload directory %s cannot be created or is not writable.", path));
        }
    }
}
