package com.couchbase.client.core.config;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.config.BucketConfigUpdatedEvent;
import com.couchbase.client.core.cnc.events.config.BucketOpenRetriedEvent;
import com.couchbase.client.core.cnc.events.config.CollectionMapRefreshFailedEvent;
import com.couchbase.client.core.cnc.events.config.CollectionMapRefreshIgnoredEvent;
import com.couchbase.client.core.cnc.events.config.CollectionMapRefreshSucceededEvent;
import com.couchbase.client.core.cnc.events.config.ConfigIgnoredEvent;
import com.couchbase.client.core.cnc.events.config.ConfigPushFailedEvent;
import com.couchbase.client.core.cnc.events.config.DnsSrvRefreshAttemptCompletedEvent;
import com.couchbase.client.core.cnc.events.config.DnsSrvRefreshAttemptFailedEvent;
import com.couchbase.client.core.cnc.events.config.GlobalConfigRetriedEvent;
import com.couchbase.client.core.cnc.events.config.GlobalConfigUpdatedEvent;
import com.couchbase.client.core.cnc.events.config.IndividualGlobalConfigLoadFailedEvent;
import com.couchbase.client.core.cnc.events.config.SeedNodesUpdateFailedEvent;
import com.couchbase.client.core.cnc.events.config.SeedNodesUpdatedEvent;
import com.couchbase.client.core.config.loader.ClusterManagerBucketLoader;
import com.couchbase.client.core.config.loader.GlobalLoader;
import com.couchbase.client.core.config.loader.KeyValueBucketLoader;
import com.couchbase.client.core.config.refresher.ClusterManagerBucketRefresher;
import com.couchbase.client.core.config.refresher.GlobalRefresher;
import com.couchbase.client.core.config.refresher.KeyValueBucketRefresher;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.SeedNode;
import com.couchbase.client.core.error.AlreadyShutdownException;
import com.couchbase.client.core.error.BucketNotFoundDuringLoadException;
import com.couchbase.client.core.error.BucketNotReadyDuringLoadException;
import com.couchbase.client.core.error.ConfigException;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.NoAccessDuringConfigLoadException;
import com.couchbase.client.core.error.RequestCanceledException;
import com.couchbase.client.core.error.SeedNodeOutdatedException;
import com.couchbase.client.core.error.TimeoutException;
import com.couchbase.client.core.error.UnsupportedConfigMechanismException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.io.CollectionMap;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.GetCollectionIdRequest;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.retry.BestEffortRetryStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.topology.ClusterTopology;
import com.couchbase.client.core.topology.NetworkSelector;
import com.couchbase.client.core.topology.PortSelector;
import com.couchbase.client.core.topology.TopologyParser;
import com.couchbase.client.core.util.ConnectionString;
import com.couchbase.client.core.util.ConnectionStringUtil;
import com.couchbase.client.core.util.NanoTimestamp;
import com.couchbase.client.core.util.UnsignedLEB128;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.naming.NamingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/couchbase/client/core/config/DefaultConfigurationProvider.class */
public class DefaultConfigurationProvider implements ConfigurationProvider {
    private static final Logger log = LoggerFactory.getLogger(DefaultConfigurationProvider.class);
    private static final Duration MIN_TIME_BETWEEN_DNS_LOOKUPS = Duration.ofSeconds(10);
    private static final int DEFAULT_KV_PORT = 11210;
    private static final int DEFAULT_MANAGER_PORT = 8091;
    private static final int DEFAULT_KV_TLS_PORT = 11207;
    private static final int DEFAULT_MANAGER_TLS_PORT = 18091;
    private static final int MAX_PARALLEL_LOADERS = 5;
    private final Core core;
    private final EventBus eventBus;

    @Nullable
    private volatile TopologyParser topologyParser;
    private final KeyValueBucketLoader keyValueLoader;
    private final ClusterManagerBucketLoader clusterManagerLoader;
    private final KeyValueBucketRefresher keyValueRefresher;
    private final ClusterManagerBucketRefresher clusterManagerRefresher;
    private final GlobalLoader globalLoader;
    private final GlobalRefresher globalRefresher;
    private final Disposable seedNodeResolver;
    private final ConnectionString connectionString;
    private final Sinks.Many<ClusterConfig> configsSink = Sinks.many().replay().latest();
    private final Flux<ClusterConfig> configs = this.configsSink.asFlux();
    private final ClusterConfig currentConfig = new ClusterConfig();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final CollectionMap collectionMap = new CollectionMap();
    private volatile boolean globalConfigLoadInProgress = false;
    private final AtomicInteger bucketConfigLoadInProgress = new AtomicInteger();
    final Set<CollectionIdentifier> collectionMapRefreshInProgress = ConcurrentHashMap.newKeySet();
    private final AtomicReference<Set<SeedNode>> currentSeedNodes = new AtomicReference<>(Collections.emptySet());
    private final Sinks.Many<Set<SeedNode>> seedNodesSink = Sinks.many().replay().latest();
    private final Flux<Set<SeedNode>> seedNodes = this.seedNodesSink.asFlux();
    private volatile NanoTimestamp lastDnsSrvLookup = NanoTimestamp.never();
    private final Sinks.Many<Long> configPollTrigger = Sinks.many().multicast().directBestEffort();

    public DefaultConfigurationProvider(Core core, ConnectionString connectionString) {
        this.core = core;
        this.eventBus = core.context().environment().eventBus();
        this.connectionString = (ConnectionString) Objects.requireNonNull(connectionString);
        this.seedNodeResolver = launchSeedNodeResolver(connectionString, core.environment());
        this.keyValueLoader = new KeyValueBucketLoader(core);
        this.clusterManagerLoader = new ClusterManagerBucketLoader(core);
        this.keyValueRefresher = new KeyValueBucketRefresher(this, core);
        this.clusterManagerRefresher = new ClusterManagerBucketRefresher(this, core);
        this.globalLoader = new GlobalLoader(core);
        this.globalRefresher = new GlobalRefresher(this, core);
        this.configsSink.emitNext(this.currentConfig, Reactor.emitFailureHandler());
    }

    private Disposable launchSeedNodeResolver(ConnectionString connectionString, CoreEnvironment coreEnvironment) {
        return Mono.fromRunnable(() -> {
            Set<SeedNode> seedNodesFromConnectionString = ConnectionStringUtil.seedNodesFromConnectionString(connectionString, coreEnvironment.ioConfig().dnsSrvEnabled(), coreEnvironment.securityConfig().tlsEnabled(), coreEnvironment.eventBus());
            this.topologyParser = createTopologyParser(this.core.environment(), seedNodesFromConnectionString);
            setSeedNodes(seedNodesFromConnectionString).orThrow();
            log.info("Resolved seed nodes: {}", RedactableArgument.redactSystem(seedNodesFromConnectionString));
        }).subscribeOn(Schedulers.boundedElastic()).doOnError(th -> {
            log.warn("Seed node resolution failed. Will try again after a brief delay. Cause:", th);
        }).retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(10L)).maxBackoff(Duration.ofSeconds(60L)).jitter(1.0d)).subscribe();
    }

    private static TopologyParser createTopologyParser(CoreEnvironment coreEnvironment, Set<SeedNode> set) {
        boolean tlsEnabled = coreEnvironment.securityConfig().tlsEnabled();
        return new TopologyParser(NetworkSelector.create(coreEnvironment.ioConfig().networkResolution(), makeDefaultPortsExplicitForNetworkDetection(set, tlsEnabled)), tlsEnabled ? PortSelector.TLS : PortSelector.NON_TLS, coreEnvironment.ioConfig().memcachedHashingStrategy());
    }

    private static Set<SeedNode> makeDefaultPortsExplicitForNetworkDetection(Set<SeedNode> set, boolean z) {
        return (Set) set.stream().map(seedNode -> {
            if (seedNode.kvPort().isPresent() || seedNode.clusterManagerPort().isPresent()) {
                return seedNode;
            }
            return seedNode.withKvPort(Integer.valueOf(z ? DEFAULT_KV_TLS_PORT : DEFAULT_KV_PORT)).withManagerPort(Integer.valueOf(z ? DEFAULT_MANAGER_TLS_PORT : 8091));
        }).collect(Collectors.toSet());
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public CollectionMap collectionMap() {
        return this.collectionMap;
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public Flux<ClusterConfig> configs() {
        return this.configs;
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public ClusterConfig config() {
        return this.currentConfig;
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public Flux<Set<SeedNode>> seedNodes() {
        return this.seedNodes;
    }

    private Mono<Set<SeedNode>> waitForSeedNodes() {
        return this.seedNodes.next().switchIfEmpty(Mono.error(new AlreadyShutdownException()));
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public Mono<Void> openBucket(String str) {
        return Mono.defer(() -> {
            if (this.shutdown.get()) {
                return Mono.error(new AlreadyShutdownException());
            }
            this.bucketConfigLoadInProgress.incrementAndGet();
            boolean tlsEnabled = this.core.context().environment().securityConfig().tlsEnabled();
            Mono then = waitForSeedNodes().flatMap(set -> {
                return fetchBucketConfigs(str, set, tlsEnabled).switchIfEmpty(Mono.error(new ConfigException("Could not locate a single bucket configuration for bucket: " + str)));
            }).map(proposedBucketConfigContext -> {
                proposeBucketConfig(proposedBucketConfigContext);
                return proposedBucketConfigContext;
            }).then(registerRefresher(str));
            AtomicInteger atomicInteger = this.bucketConfigLoadInProgress;
            Objects.requireNonNull(atomicInteger);
            return then.doOnTerminate(atomicInteger::decrementAndGet).onErrorResume(th -> {
                return closeBucketIgnoreShutdown(str, true).then(Mono.error(th));
            });
        });
    }

    protected Mono<ProposedBucketConfigContext> loadBucketConfigForSeed(NodeIdentifier nodeIdentifier, int i, int i2, String str) {
        return this.keyValueLoader.load(nodeIdentifier, i, str).onErrorResume(th -> {
            return (((th instanceof ConfigException) && (th.getCause() instanceof RequestCanceledException) && ((RequestCanceledException) th.getCause()).reason() == CancellationReason.TARGET_NODE_REMOVED) || (th instanceof SeedNodeOutdatedException)) ? Mono.error(th) : this.clusterManagerLoader.load(nodeIdentifier, i2, str);
        }).flatMap(proposedBucketConfigContext -> {
            return Mapper.decodeIntoTree(proposedBucketConfigContext.config()).get("nodes").isEmpty() ? Mono.error(new BucketNotReadyDuringLoadException("No KV node in the config (yet), can't use it for now.")) : Mono.just(proposedBucketConfigContext);
        });
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public Mono<Void> loadAndRefreshGlobalConfig() {
        return Mono.defer(() -> {
            if (this.shutdown.get()) {
                return Mono.error(new AlreadyShutdownException());
            }
            this.globalConfigLoadInProgress = true;
            boolean tlsEnabled = this.core.context().environment().securityConfig().tlsEnabled();
            return waitForSeedNodes().flatMap(set -> {
                return fetchGlobalConfigs(set, tlsEnabled, false, true).switchIfEmpty(Mono.error(new ConfigException("Could not locate a single global configuration")));
            }).map(proposedGlobalConfigContext -> {
                proposeGlobalConfig(proposedGlobalConfigContext);
                return proposedGlobalConfigContext;
            }).then(this.globalRefresher.start()).doOnTerminate(() -> {
                this.globalConfigLoadInProgress = false;
            });
        });
    }

    private ClusterTopology parseClusterTopology(String str, String str2) {
        TopologyParser topologyParser = this.topologyParser;
        if (topologyParser == null) {
            throw new IllegalStateException("Can't parse cluster topology until seed nodes are resolved.");
        }
        return topologyParser.parse(str, str2);
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public void proposeBucketConfig(ProposedBucketConfigContext proposedBucketConfigContext) {
        if (this.shutdown.get()) {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.ALREADY_SHUTDOWN, Optional.empty(), Optional.of(proposedBucketConfigContext.config()), Optional.of(proposedBucketConfigContext.bucketName())));
            return;
        }
        try {
            if (proposedBucketConfigContext.config().isEmpty()) {
                this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.OLD_OR_SAME_REVISION, Optional.empty(), Optional.empty(), Optional.of(proposedBucketConfigContext.bucketName())));
            } else {
                checkAndApplyConfig(LegacyConfigHelper.toLegacyBucketConfig(parseClusterTopology(proposedBucketConfigContext.config(), proposedBucketConfigContext.origin()).requireBucket()), proposedBucketConfigContext.forcesOverride());
            }
        } catch (Exception e) {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.PARSE_FAILURE, Optional.of(e), Optional.of(proposedBucketConfigContext.config()), Optional.of(proposedBucketConfigContext.bucketName())));
        }
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public void proposeGlobalConfig(ProposedGlobalConfigContext proposedGlobalConfigContext) {
        if (this.shutdown.get()) {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.ALREADY_SHUTDOWN, Optional.empty(), Optional.of(proposedGlobalConfigContext.config()), Optional.empty()));
            return;
        }
        try {
            if (proposedGlobalConfigContext.config().isEmpty()) {
                this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.OLD_OR_SAME_REVISION, Optional.empty(), Optional.empty(), Optional.empty()));
            } else {
                checkAndApplyConfig(new GlobalConfig(parseClusterTopology(proposedGlobalConfigContext.config(), proposedGlobalConfigContext.origin())), proposedGlobalConfigContext.forcesOverride());
            }
        } catch (Exception e) {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.PARSE_FAILURE, Optional.of(e), Optional.of(proposedGlobalConfigContext.config()), Optional.empty()));
        }
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public Mono<Void> closeBucket(String str, boolean z) {
        return Mono.defer(() -> {
            return this.shutdown.get() ? Mono.error(new AlreadyShutdownException()) : closeBucketIgnoreShutdown(str, z);
        });
    }

    private Mono<Void> closeBucketIgnoreShutdown(String str, boolean z) {
        return Mono.defer(() -> {
            this.currentConfig.deleteBucketConfig(str);
            if (z) {
                pushConfig(false);
            }
            return Mono.empty();
        }).then(this.keyValueRefresher.deregister(str)).then(this.clusterManagerRefresher.deregister(str));
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public Mono<Void> shutdown() {
        return Mono.defer(() -> {
            return this.shutdown.compareAndSet(false, true) ? Flux.fromIterable(this.currentConfig.bucketConfigs().values()).flatMap(bucketConfig -> {
                return closeBucketIgnoreShutdown(bucketConfig.name(), false);
            }).then(Mono.defer(this::disableAndClearGlobalConfig)).doOnTerminate(() -> {
                pushConfig(true);
                this.configsSink.emitComplete(Reactor.emitFailureHandler());
                this.seedNodesSink.emitComplete(Reactor.emitFailureHandler());
                this.seedNodeResolver.dispose();
            }).then(this.keyValueRefresher.shutdown()).then(this.clusterManagerRefresher.shutdown()).then(this.globalRefresher.shutdown()) : Mono.error(new AlreadyShutdownException());
        });
    }

    private Mono<Void> disableAndClearGlobalConfig() {
        return this.globalRefresher.stop().then(Mono.defer(() -> {
            this.currentConfig.deleteGlobalConfig();
            return Mono.empty();
        }));
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public synchronized void refreshCollectionId(CollectionIdentifier collectionIdentifier) {
        if (collectionRefreshInProgress(collectionIdentifier)) {
            this.eventBus.publish(new CollectionMapRefreshIgnoredEvent(this.core.context(), collectionIdentifier));
            return;
        }
        this.collectionMapRefreshInProgress.add(collectionIdentifier);
        NanoTimestamp now = NanoTimestamp.now();
        GetCollectionIdRequest getCollectionIdRequest = new GetCollectionIdRequest(this.core.context().environment().timeoutConfig().kvTimeout(), this.core.context(), BestEffortRetryStrategy.INSTANCE, collectionIdentifier);
        this.core.send(getCollectionIdRequest);
        getCollectionIdRequest.response().whenComplete((BiConsumer<? super R, ? super Throwable>) (getCollectionIdResponse, th) -> {
            CollectionMapRefreshFailedEvent.Reason reason;
            try {
                Duration elapsed = now.elapsed();
                if (th != null) {
                    this.eventBus.publish(new CollectionMapRefreshFailedEvent(elapsed, this.core.context(), collectionIdentifier, th, CollectionMapRefreshFailedEvent.Reason.FAILED));
                    this.collectionMapRefreshInProgress.remove(collectionIdentifier);
                    return;
                }
                if (!getCollectionIdResponse.status().success()) {
                    CouchbaseException couchbaseException = null;
                    if (getCollectionIdResponse.status() == ResponseStatus.UNKNOWN || getCollectionIdResponse.status() == ResponseStatus.NO_COLLECTIONS_MANIFEST) {
                        reason = CollectionMapRefreshFailedEvent.Reason.NOT_SUPPORTED;
                    } else if (getCollectionIdResponse.status() == ResponseStatus.UNKNOWN_COLLECTION) {
                        reason = CollectionMapRefreshFailedEvent.Reason.UNKNOWN_COLLECTION;
                    } else if (getCollectionIdResponse.status() == ResponseStatus.UNKNOWN_SCOPE) {
                        reason = CollectionMapRefreshFailedEvent.Reason.UNKNOWN_SCOPE;
                    } else if (getCollectionIdResponse.status() == ResponseStatus.INVALID_REQUEST) {
                        reason = CollectionMapRefreshFailedEvent.Reason.INVALID_REQUEST;
                    } else {
                        couchbaseException = new CouchbaseException(getCollectionIdResponse.toString());
                        reason = CollectionMapRefreshFailedEvent.Reason.UNKNOWN;
                    }
                    this.eventBus.publish(new CollectionMapRefreshFailedEvent(elapsed, this.core.context(), collectionIdentifier, couchbaseException, reason));
                } else if (getCollectionIdResponse.collectionId().isPresent()) {
                    long longValue = getCollectionIdResponse.collectionId().get().longValue();
                    this.collectionMap.put(collectionIdentifier, UnsignedLEB128.encode(longValue));
                    this.eventBus.publish(new CollectionMapRefreshSucceededEvent(elapsed, this.core.context(), collectionIdentifier, longValue));
                } else {
                    this.eventBus.publish(new CollectionMapRefreshFailedEvent(elapsed, this.core.context(), collectionIdentifier, null, CollectionMapRefreshFailedEvent.Reason.COLLECTION_ID_NOT_PRESENT));
                }
            } finally {
                this.collectionMapRefreshInProgress.remove(collectionIdentifier);
            }
        });
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public boolean collectionRefreshInProgress() {
        return !this.collectionMapRefreshInProgress.isEmpty();
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public boolean collectionRefreshInProgress(CollectionIdentifier collectionIdentifier) {
        return this.collectionMapRefreshInProgress.contains(collectionIdentifier);
    }

    private synchronized void checkAndApplyConfig(BucketConfig bucketConfig, boolean z) {
        String name = bucketConfig.name();
        BucketConfig bucketConfig2 = this.currentConfig.bucketConfig(name);
        if (!z && bucketConfig2 != null && bucketConfig.version().isLessThanOrEqualTo(bucketConfig2.version())) {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.OLD_OR_SAME_REVISION, Optional.empty(), Optional.empty(), Optional.of(bucketConfig.name())));
            return;
        }
        if (bucketConfig.tainted()) {
            this.keyValueRefresher.markTainted(name);
            this.clusterManagerRefresher.markTainted(name);
        } else {
            this.keyValueRefresher.markUntainted(name);
            this.clusterManagerRefresher.markUntainted(name);
        }
        this.eventBus.publish(new BucketConfigUpdatedEvent(this.core.context(), bucketConfig));
        this.currentConfig.setBucketConfig(bucketConfig);
        updateSeedNodeList();
        pushConfig(false);
    }

    private synchronized void checkAndApplyConfig(GlobalConfig globalConfig, boolean z) {
        GlobalConfig globalConfig2 = this.currentConfig.globalConfig();
        if (!z && globalConfig2 != null && globalConfig.version().isLessThanOrEqualTo(globalConfig2.version())) {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.OLD_OR_SAME_REVISION, Optional.empty(), Optional.empty(), Optional.empty()));
            return;
        }
        this.eventBus.publish(new GlobalConfigUpdatedEvent(this.core.context(), globalConfig));
        this.currentConfig.setGlobalConfig(globalConfig);
        updateSeedNodeList();
        pushConfig(false);
    }

    private void updateSeedNodeList() {
        ClusterConfig clusterConfig = this.currentConfig;
        boolean tlsEnabled = this.core.context().environment().securityConfig().tlsEnabled();
        if (clusterConfig.globalConfig() != null) {
            Set<SeedNode> unmodifiableSet = Collections.unmodifiableSet((Set) clusterConfig.globalConfig().portInfos().stream().map(portInfo -> {
                Map<ServiceType, Integer> sslPorts = tlsEnabled ? portInfo.sslPorts() : portInfo.ports();
                if (sslPorts.containsKey(ServiceType.KV)) {
                    return SeedNode.create(portInfo.hostname(), Optional.ofNullable(sslPorts.get(ServiceType.KV)), Optional.ofNullable(sslPorts.get(ServiceType.MANAGER)));
                }
                return null;
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toSet()));
            if (unmodifiableSet.isEmpty()) {
                return;
            }
            this.eventBus.publish(new SeedNodesUpdatedEvent(this.core.context(), currentSeedNodes(), unmodifiableSet));
            setSeedNodes(unmodifiableSet);
            return;
        }
        Set<SeedNode> unmodifiableSet2 = Collections.unmodifiableSet((Set) clusterConfig.bucketConfigs().values().stream().flatMap(bucketConfig -> {
            return bucketConfig.nodes().stream();
        }).map(nodeInfo -> {
            Map<ServiceType, Integer> sslServices = tlsEnabled ? nodeInfo.sslServices() : nodeInfo.services();
            if (sslServices.containsKey(ServiceType.KV)) {
                return SeedNode.create(nodeInfo.hostname(), Optional.ofNullable(sslServices.get(ServiceType.KV)), Optional.ofNullable(sslServices.get(ServiceType.MANAGER)));
            }
            return null;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet()));
        if (unmodifiableSet2.isEmpty()) {
            return;
        }
        this.eventBus.publish(new SeedNodesUpdatedEvent(this.core.context(), currentSeedNodes(), unmodifiableSet2));
        setSeedNodes(unmodifiableSet2);
    }

    private synchronized void pushConfig(boolean z) {
        if (!z && this.shutdown.get()) {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.ALREADY_SHUTDOWN, Optional.empty(), Optional.empty(), Optional.empty()));
            return;
        }
        Sinks.EmitResult tryEmitNext = this.configsSink.tryEmitNext(this.currentConfig);
        if (tryEmitNext != Sinks.EmitResult.OK) {
            this.eventBus.publish(new ConfigPushFailedEvent(this.core.context(), tryEmitNext));
        }
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public void republishCurrentConfig() {
        pushConfig(false);
    }

    protected Mono<Void> registerRefresher(String str) {
        return Mono.defer(() -> {
            BucketConfig bucketConfig = this.currentConfig.bucketConfig(str);
            return bucketConfig == null ? Mono.error(new CouchbaseException("Bucket for registration does not exist, this is an error! Please report")) : bucketConfig instanceof CouchbaseBucketConfig ? this.keyValueRefresher.register(str) : this.clusterManagerRefresher.register(str);
        });
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public boolean globalConfigLoadInProgress() {
        return this.globalConfigLoadInProgress;
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public boolean bucketConfigLoadInProgress() {
        return this.bucketConfigLoadInProgress.get() > 0;
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public void signalConfigRefreshFailed(ConfigRefreshFailure configRefreshFailure) {
        if (configRefreshFailure == ConfigRefreshFailure.ALL_NODES_TRIED_ONCE_WITHOUT_SUCCESS) {
            handlePotentialDnsSrvRefresh();
        }
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public synchronized void signalConfigChanged() {
        this.configPollTrigger.tryEmitNext(-1L);
    }

    @Override // com.couchbase.client.core.config.ConfigurationProvider
    public Flux<Long> configChangeNotifications() {
        return this.configPollTrigger.asFlux();
    }

    private synchronized void handlePotentialDnsSrvRefresh() {
        CoreContext context = this.core.context();
        CoreEnvironment environment = context.environment();
        boolean z = this.connectionString.isValidDnsSrv() && environment.ioConfig().dnsSrvEnabled();
        boolean tlsEnabled = environment.securityConfig().tlsEnabled();
        if (z && this.lastDnsSrvLookup.hasElapsed(MIN_TIME_BETWEEN_DNS_LOOKUPS)) {
            this.lastDnsSrvLookup = NanoTimestamp.now();
            NanoTimestamp now = NanoTimestamp.now();
            Schedulers.boundedElastic().schedule(() -> {
                try {
                    List<String> performDnsSrvLookup = performDnsSrvLookup(tlsEnabled);
                    if (performDnsSrvLookup.isEmpty()) {
                        environment.eventBus().publish(new DnsSrvRefreshAttemptFailedEvent(now.elapsed(), context, DnsSrvRefreshAttemptFailedEvent.Reason.NO_NEW_SEEDS_RETURNED, null));
                        return;
                    }
                    Set<SeedNode> set = (Set) performDnsSrvLookup.stream().map(SeedNode::create).collect(Collectors.toSet());
                    ProposedGlobalConfigContext block = fetchGlobalConfigs(set, tlsEnabled, true, false).block();
                    if (block != null) {
                        proposeGlobalConfig(block.forceOverride());
                    }
                    Iterator<String> it = this.keyValueRefresher.registered().iterator();
                    while (it.hasNext()) {
                        ProposedBucketConfigContext block2 = fetchBucketConfigs(it.next(), set, tlsEnabled).block();
                        if (block2 != null) {
                            proposeBucketConfig(block2.forceOverride());
                        }
                    }
                    environment.eventBus().publish(new DnsSrvRefreshAttemptCompletedEvent(now.elapsed(), context, performDnsSrvLookup));
                } catch (Exception e) {
                    environment.eventBus().publish(new DnsSrvRefreshAttemptFailedEvent(now.elapsed(), context, DnsSrvRefreshAttemptFailedEvent.Reason.OTHER, e));
                }
            });
        }
    }

    protected List<String> performDnsSrvLookup(boolean z) throws NamingException {
        return ConnectionStringUtil.fromDnsSrvOrThrowIfTlsRequired(this.connectionString.hosts().get(0).host(), z);
    }

    private Mono<ProposedBucketConfigContext> fetchBucketConfigs(String str, Set<SeedNode> set, boolean z) {
        int i = z ? DEFAULT_KV_TLS_PORT : DEFAULT_KV_PORT;
        int i2 = z ? DEFAULT_MANAGER_TLS_PORT : 8091;
        return Flux.range(1, Math.min(5, set.size())).flatMap(num -> {
            return Flux.fromIterable(set).take(Math.min(num.intValue(), set.size())).last().flatMap(seedNode -> {
                return loadBucketConfigForSeed(NodeIdentifier.forBootstrap(seedNode.address(), seedNode.clusterManagerPort().orElse(8091).intValue()), seedNode.kvPort().orElse(Integer.valueOf(i)).intValue(), seedNode.clusterManagerPort().orElse(Integer.valueOf(i2)).intValue(), str);
            }).retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(500L)).maxBackoff(Duration.ofSeconds(10L)).filter(bucketConfigLoadRetryFilter(str, th -> {
                return isInstanceOfAnyOf(th, BucketNotFoundDuringLoadException.class, BucketNotReadyDuringLoadException.class, NoAccessDuringConfigLoadException.class);
            }))).retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofMillis(10L)).filter(bucketConfigLoadRetryFilter(str, th2 -> {
                return !(th2 instanceof UnsupportedConfigMechanismException);
            })));
        }).next();
    }

    private Predicate<? super Throwable> bucketConfigLoadRetryFilter(String str, Predicate<? super Throwable> predicate) {
        return th -> {
            if (this.shutdown.get()) {
                throw new AlreadyShutdownException();
            }
            boolean test = predicate.test(th);
            if (test) {
                this.eventBus.publish(new BucketOpenRetriedEvent(str, Duration.ZERO, this.core.context(), th));
            }
            return test;
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isInstanceOfAnyOf(Object obj, Class<?>... clsArr) {
        return Arrays.stream(clsArr).anyMatch(cls -> {
            return cls.isInstance(obj);
        });
    }

    private Mono<ProposedGlobalConfigContext> fetchGlobalConfigs(Set<SeedNode> set, boolean z, boolean z2, boolean z3) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        int i = z ? DEFAULT_KV_TLS_PORT : DEFAULT_KV_PORT;
        return Flux.range(1, Math.min(5, set.size())).flatMap(num -> {
            return Flux.fromIterable(set).take(Math.min(num.intValue(), set.size())).last().flatMap(seedNode -> {
                NanoTimestamp now = NanoTimestamp.now();
                if (!z2 && !currentSeedNodes().contains(seedNode)) {
                    return Mono.empty();
                }
                return this.globalLoader.load(NodeIdentifier.forBootstrap(seedNode.address(), seedNode.clusterManagerPort().orElse(8091).intValue()), seedNode.kvPort().orElse(Integer.valueOf(i)).intValue()).doOnError(th -> {
                    this.core.context().environment().eventBus().publish(new IndividualGlobalConfigLoadFailedEvent(now.elapsed(), this.core.context(), th, seedNode.address()));
                });
            }).retryWhen(Retry.from(flux -> {
                return flux.flatMap(retrySignal -> {
                    Throwable failure = retrySignal.failure();
                    if (this.shutdown.get()) {
                        return Mono.error(new AlreadyShutdownException());
                    }
                    if (failure instanceof UnsupportedConfigMechanismException) {
                        return Mono.error(Exceptions.propagate(failure));
                    }
                    if (!z3 && (failure.getCause() instanceof TimeoutException)) {
                        return Mono.error(failure.getCause());
                    }
                    Duration ofMillis = Duration.ofMillis(1L);
                    this.eventBus.publish(new GlobalConfigRetriedEvent(ofMillis, this.core.context(), failure));
                    return Mono.just(Long.valueOf(retrySignal.totalRetries())).delayElement(ofMillis, this.core.context().environment().scheduler());
                });
            })).onErrorResume(th -> {
                return atomicBoolean.compareAndSet(false, true) ? Mono.error(th) : Mono.empty();
            });
        }).next();
    }

    private Set<SeedNode> currentSeedNodes() {
        return this.currentSeedNodes.get();
    }

    private synchronized Sinks.EmitResult setSeedNodes(Set<SeedNode> set) {
        this.currentSeedNodes.set(set);
        Sinks.EmitResult tryEmitNext = this.seedNodesSink.tryEmitNext(set);
        if (tryEmitNext != Sinks.EmitResult.OK) {
            this.eventBus.publish(new SeedNodesUpdateFailedEvent(this.core.context(), tryEmitNext));
        }
        return tryEmitNext;
    }
}
