/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.common.config;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.function.ObjIntConsumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.connector.pulsar.common.config.PulsarConfiguration;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

@Internal
public final class PulsarClientFactory {
    private PulsarClientFactory() {
    }

    public static PulsarClient createClient(PulsarConfiguration configuration) throws PulsarClientException {
        ClientBuilder builder = PulsarClient.builder();
        Integer requestTimeoutMs = (Integer)configuration.get(PulsarOptions.PULSAR_REQUEST_TIMEOUT_MS);
        builder.loadConf(Collections.singletonMap("requestTimeoutMs", requestTimeoutMs));
        builder.authentication(PulsarClientFactory.createAuthentication(configuration));
        configuration.useOption(PulsarOptions.PULSAR_SERVICE_URL, builder::serviceUrl);
        configuration.useOption(PulsarOptions.PULSAR_LISTENER_NAME, builder::listenerName);
        configuration.useOption(PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS, timeout -> builder.operationTimeout((int)timeout, TimeUnit.MILLISECONDS));
        configuration.useOption(PulsarOptions.PULSAR_LOOKUP_TIMEOUT_MS, timeout -> builder.lookupTimeout((int)timeout, TimeUnit.MILLISECONDS));
        configuration.useOption(PulsarOptions.PULSAR_NUM_IO_THREADS, builder::ioThreads);
        configuration.useOption(PulsarOptions.PULSAR_NUM_LISTENER_THREADS, builder::listenerThreads);
        configuration.useOption(PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER, builder::connectionsPerBroker);
        configuration.useOption(PulsarOptions.PULSAR_CONNECTION_MAX_IDLE_SECONDS, builder::connectionMaxIdleSeconds);
        configuration.useOption(PulsarOptions.PULSAR_USE_TCP_NO_DELAY, builder::enableTcpNoDelay);
        configuration.useOption(PulsarOptions.PULSAR_TLS_KEY_FILE_PATH, builder::tlsKeyFilePath);
        configuration.useOption(PulsarOptions.PULSAR_TLS_CERTIFICATE_FILE_PATH, builder::tlsCertificateFilePath);
        configuration.useOption(PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH, builder::tlsTrustCertsFilePath);
        configuration.useOption(PulsarOptions.PULSAR_TLS_ALLOW_INSECURE_CONNECTION, builder::allowTlsInsecureConnection);
        configuration.useOption(PulsarOptions.PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE, builder::enableTlsHostnameVerification);
        configuration.useOption(PulsarOptions.PULSAR_USE_KEY_STORE_TLS, builder::useKeyStoreTls);
        configuration.useOption(PulsarOptions.PULSAR_SSL_PROVIDER, builder::sslProvider);
        configuration.useOption(PulsarOptions.PULSAR_TLS_KEY_STORE_TYPE, builder::tlsKeyStoreType);
        configuration.useOption(PulsarOptions.PULSAR_TLS_KEY_STORE_PATH, builder::tlsKeyStorePath);
        configuration.useOption(PulsarOptions.PULSAR_TLS_KEY_STORE_PASSWORD, builder::tlsKeyStorePassword);
        configuration.useOption(PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE, builder::tlsTrustStoreType);
        configuration.useOption(PulsarOptions.PULSAR_TLS_TRUST_STORE_PATH, builder::tlsTrustStorePath);
        configuration.useOption(PulsarOptions.PULSAR_TLS_TRUST_STORE_PASSWORD, builder::tlsTrustStorePassword);
        configuration.useOption(PulsarOptions.PULSAR_TLS_CIPHERS, TreeSet::new, builder::tlsCiphers);
        configuration.useOption(PulsarOptions.PULSAR_TLS_PROTOCOLS, TreeSet::new, builder::tlsProtocols);
        configuration.useOption(PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES, bytes -> builder.memoryLimit((long)bytes, SizeUnit.BYTES));
        configuration.useOption(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS, v -> builder.statsInterval((long)v, TimeUnit.SECONDS));
        configuration.useOption(PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST, builder::maxConcurrentLookupRequests);
        configuration.useOption(PulsarOptions.PULSAR_MAX_LOOKUP_REQUEST, builder::maxLookupRequests);
        configuration.useOption(PulsarOptions.PULSAR_MAX_LOOKUP_REDIRECTS, builder::maxLookupRedirects);
        configuration.useOption(PulsarOptions.PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION, builder::maxNumberOfRejectedRequestPerConnection);
        configuration.useOption(PulsarOptions.PULSAR_KEEP_ALIVE_INTERVAL_SECONDS, v -> builder.keepAliveInterval((int)v, TimeUnit.SECONDS));
        configuration.useOption(PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS, v -> builder.connectionTimeout((int)v, TimeUnit.MILLISECONDS));
        configuration.useOption(PulsarOptions.PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS, v -> builder.startingBackoffInterval((long)v, TimeUnit.NANOSECONDS));
        configuration.useOption(PulsarOptions.PULSAR_MAX_BACKOFF_INTERVAL_NANOS, v -> builder.maxBackoffInterval((long)v, TimeUnit.NANOSECONDS));
        configuration.useOption(PulsarOptions.PULSAR_ENABLE_BUSY_WAIT, builder::enableBusyWait);
        if (configuration.contains(PulsarOptions.PULSAR_PROXY_SERVICE_URL)) {
            String proxyServiceUrl = (String)configuration.get(PulsarOptions.PULSAR_PROXY_SERVICE_URL);
            ProxyProtocol proxyProtocol = (ProxyProtocol)((Object)configuration.get(PulsarOptions.PULSAR_PROXY_PROTOCOL));
            builder.proxyServiceUrl(proxyServiceUrl, proxyProtocol);
        }
        configuration.useOption(PulsarOptions.PULSAR_ENABLE_TRANSACTION, builder::enableTransaction);
        PulsarClientFactory.bindAddress(configuration, PulsarOptions.PULSAR_DNS_LOOKUP_BIND_ADDRESS, true, builder::dnsLookupBind);
        PulsarClientFactory.bindAddress(configuration, PulsarOptions.PULSAR_SOCKS5_PROXY_ADDRESS, false, (host, port) -> {
            builder.socks5ProxyAddress(new InetSocketAddress((String)host, port));
            configuration.useOption(PulsarOptions.PULSAR_SOCKS5_PROXY_USERNAME, builder::socks5ProxyUsername);
            configuration.useOption(PulsarOptions.PULSAR_SOCKS5_PROXY_PASSWORD, builder::socks5ProxyPassword);
        });
        return builder.build();
    }

    private static Authentication createAuthentication(PulsarConfiguration configuration) throws PulsarClientException {
        if (configuration.contains(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME)) {
            String authPluginClassName = (String)configuration.get(PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME);
            if (configuration.contains(PulsarOptions.PULSAR_AUTH_PARAMS)) {
                String authParamsString = (String)configuration.get(PulsarOptions.PULSAR_AUTH_PARAMS);
                return AuthenticationFactory.create(authPluginClassName, authParamsString);
            }
            Map<String, String> paramsMap = configuration.getProperties(PulsarOptions.PULSAR_AUTH_PARAM_MAP);
            if (paramsMap.isEmpty()) {
                throw new IllegalArgumentException(String.format("No %s or %s provided", PulsarOptions.PULSAR_AUTH_PARAMS.key(), PulsarOptions.PULSAR_AUTH_PARAM_MAP.key()));
            }
            return AuthenticationFactory.create(authPluginClassName, paramsMap);
        }
        return AuthenticationDisabled.INSTANCE;
    }

    private static void bindAddress(PulsarConfiguration configuration, ConfigOption<String> option, boolean allowRandomPort, ObjIntConsumer<String> setter) {
        if (!configuration.contains(option)) {
            return;
        }
        String address = (String)configuration.get(option);
        if (address.contains(":")) {
            try {
                String[] addresses = address.split(":");
                String host = addresses[0];
                int port = Integer.parseInt(addresses[1]);
                setter.accept(host, port);
            }
            catch (NumberFormatException e) {
                throw new IllegalArgumentException("Invalid address '" + address + "', port should be int.");
            }
        } else if (allowRandomPort) {
            setter.accept(address, 0);
        } else {
            throw new IllegalArgumentException("The address '" + address + "' should be in host:port format.");
        }
    }
}

