/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.config;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.config.BucketConfigUpdatedEvent;
import com.couchbase.client.core.cnc.events.config.BucketOpenRetriedEvent;
import com.couchbase.client.core.cnc.events.config.CollectionMapRefreshFailedEvent;
import com.couchbase.client.core.cnc.events.config.CollectionMapRefreshIgnoredEvent;
import com.couchbase.client.core.cnc.events.config.CollectionMapRefreshSucceededEvent;
import com.couchbase.client.core.cnc.events.config.ConfigIgnoredEvent;
import com.couchbase.client.core.cnc.events.config.ConfigPushFailedEvent;
import com.couchbase.client.core.cnc.events.config.DnsSrvRefreshAttemptCompletedEvent;
import com.couchbase.client.core.cnc.events.config.DnsSrvRefreshAttemptFailedEvent;
import com.couchbase.client.core.cnc.events.config.GlobalConfigRetriedEvent;
import com.couchbase.client.core.cnc.events.config.GlobalConfigUpdatedEvent;
import com.couchbase.client.core.cnc.events.config.IndividualGlobalConfigLoadFailedEvent;
import com.couchbase.client.core.cnc.events.config.SeedNodesUpdateFailedEvent;
import com.couchbase.client.core.cnc.events.config.SeedNodesUpdatedEvent;
import com.couchbase.client.core.config.AlternateAddress;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.BucketConfigParser;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.config.ConfigRefreshFailure;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.config.GlobalConfig;
import com.couchbase.client.core.config.GlobalConfigParser;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.config.PortInfo;
import com.couchbase.client.core.config.ProposedBucketConfigContext;
import com.couchbase.client.core.config.ProposedGlobalConfigContext;
import com.couchbase.client.core.config.loader.ClusterManagerBucketLoader;
import com.couchbase.client.core.config.loader.GlobalLoader;
import com.couchbase.client.core.config.loader.KeyValueBucketLoader;
import com.couchbase.client.core.config.refresher.ClusterManagerBucketRefresher;
import com.couchbase.client.core.config.refresher.GlobalRefresher;
import com.couchbase.client.core.config.refresher.KeyValueBucketRefresher;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.JsonNode;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.env.NetworkResolution;
import com.couchbase.client.core.env.SeedNode;
import com.couchbase.client.core.error.AlreadyShutdownException;
import com.couchbase.client.core.error.BucketNotFoundDuringLoadException;
import com.couchbase.client.core.error.BucketNotReadyDuringLoadException;
import com.couchbase.client.core.error.ConfigException;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.RequestCanceledException;
import com.couchbase.client.core.error.SeedNodeOutdatedException;
import com.couchbase.client.core.error.TimeoutException;
import com.couchbase.client.core.error.UnsupportedConfigMechanismException;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.io.CollectionMap;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.msg.CancellationReason;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.kv.GetCollectionIdRequest;
import com.couchbase.client.core.node.NodeIdentifier;
import com.couchbase.client.core.retry.BestEffortRetryStrategy;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.ConnectionString;
import com.couchbase.client.core.util.ConnectionStringUtil;
import com.couchbase.client.core.util.NanoTimestamp;
import com.couchbase.client.core.util.UnsignedLEB128;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.naming.NamingException;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

public class DefaultConfigurationProvider
implements ConfigurationProvider {
    private static final Duration MIN_TIME_BETWEEN_DNS_LOOKUPS = Duration.ofSeconds(10L);
    private static final int DEFAULT_KV_PORT = 11210;
    private static final int DEFAULT_MANAGER_PORT = 8091;
    private static final int DEFAULT_KV_TLS_PORT = 11207;
    private static final int DEFAULT_MANAGER_TLS_PORT = 18091;
    private static final int MAX_PARALLEL_LOADERS = 5;
    private final Core core;
    private final EventBus eventBus;
    private final KeyValueBucketLoader keyValueLoader;
    private final ClusterManagerBucketLoader clusterManagerLoader;
    private final KeyValueBucketRefresher keyValueRefresher;
    private final ClusterManagerBucketRefresher clusterManagerRefresher;
    private final GlobalLoader globalLoader;
    private final GlobalRefresher globalRefresher;
    private final Sinks.Many<ClusterConfig> configsSink = Sinks.many().replay().latest();
    private final Flux<ClusterConfig> configs = this.configsSink.asFlux();
    private final ClusterConfig currentConfig = new ClusterConfig();
    private final AtomicBoolean shutdown = new AtomicBoolean(false);
    private final CollectionMap collectionMap = new CollectionMap();
    private final AtomicBoolean alternateAddrChecked = new AtomicBoolean(false);
    private volatile boolean globalConfigLoadInProgress = false;
    private final AtomicInteger bucketConfigLoadInProgress = new AtomicInteger();
    final Set<CollectionIdentifier> collectionMapRefreshInProgress = ConcurrentHashMap.newKeySet();
    private final AtomicReference<Set<SeedNode>> currentSeedNodes;
    private final Sinks.Many<Set<SeedNode>> seedNodesSink = Sinks.many().replay().latest();
    private final Flux<Set<SeedNode>> seedNodes = this.seedNodesSink.asFlux();
    private final ConnectionString connectionString;
    private volatile NanoTimestamp lastDnsSrvLookup = NanoTimestamp.never();

    public DefaultConfigurationProvider(Core core, Set<SeedNode> seedNodes) {
        this(core, seedNodes, null);
    }

    public DefaultConfigurationProvider(Core core, Set<SeedNode> seedNodes, String connectionString) {
        this.core = core;
        this.eventBus = core.context().environment().eventBus();
        this.connectionString = connectionString != null ? ConnectionString.create(connectionString) : null;
        this.currentSeedNodes = new AtomicReference<Set<SeedNode>>(CbCollections.copyToUnmodifiableSet(seedNodes));
        this.keyValueLoader = new KeyValueBucketLoader(core);
        this.clusterManagerLoader = new ClusterManagerBucketLoader(core);
        this.keyValueRefresher = new KeyValueBucketRefresher(this, core);
        this.clusterManagerRefresher = new ClusterManagerBucketRefresher(this, core);
        this.globalLoader = new GlobalLoader(core);
        this.globalRefresher = new GlobalRefresher(this, core);
        this.configsSink.emitNext(this.currentConfig, Reactor.emitFailureHandler());
    }

    @Override
    public CollectionMap collectionMap() {
        return this.collectionMap;
    }

    @Override
    public Flux<ClusterConfig> configs() {
        return this.configs;
    }

    @Override
    public ClusterConfig config() {
        return this.currentConfig;
    }

    @Override
    public Flux<Set<SeedNode>> seedNodes() {
        return this.seedNodes;
    }

    @Override
    public Mono<Void> openBucket(String name) {
        return Mono.defer(() -> {
            if (!this.shutdown.get()) {
                this.bucketConfigLoadInProgress.incrementAndGet();
                boolean tls = this.core.context().environment().securityConfig().tlsEnabled();
                return this.fetchBucketConfigs(name, this.currentSeedNodes.get(), tls).switchIfEmpty(Mono.error(new ConfigException("Could not locate a single bucket configuration for bucket: " + name))).map(ctx -> {
                    this.proposeBucketConfig((ProposedBucketConfigContext)ctx);
                    return ctx;
                }).then(this.registerRefresher(name)).doOnTerminate(this.bucketConfigLoadInProgress::decrementAndGet).onErrorResume(t -> this.closeBucketIgnoreShutdown(name).then(Mono.error(t)));
            }
            return Mono.error(new AlreadyShutdownException());
        });
    }

    protected Mono<ProposedBucketConfigContext> loadBucketConfigForSeed(NodeIdentifier identifier, int mappedKvPort, int mappedManagerPort, String name, Optional<String> alternateAddress) {
        return this.keyValueLoader.load(identifier, mappedKvPort, name, alternateAddress).onErrorResume(t -> {
            boolean removedWhileOpInFlight = t instanceof ConfigException && t.getCause() instanceof RequestCanceledException && ((RequestCanceledException)t.getCause()).reason() == CancellationReason.TARGET_NODE_REMOVED;
            boolean seedNodeOutdated = t instanceof SeedNodeOutdatedException;
            if (removedWhileOpInFlight || seedNodeOutdated) {
                return Mono.error(t);
            }
            return this.clusterManagerLoader.load(identifier, mappedManagerPort, name, alternateAddress);
        }).flatMap(ctx -> {
            JsonNode configRoot = Mapper.decodeIntoTree(ctx.config());
            if (configRoot.get("nodes").isEmpty()) {
                return Mono.error(new BucketNotReadyDuringLoadException("No KV node in the config (yet), can't use it for now."));
            }
            return Mono.just(ctx);
        });
    }

    private String mapAlternateAddress(String a, SeedNode seed, boolean tls, AtomicReference<Map<ServiceType, Integer>> alternatePorts) {
        ClusterConfig c = this.currentConfig;
        if (c.globalConfig() != null) {
            for (PortInfo pi : c.globalConfig().portInfos()) {
                if (!seed.address().equals(pi.hostname())) continue;
                alternatePorts.set(tls ? pi.alternateAddresses().get(a).sslServices() : pi.alternateAddresses().get(a).services());
                return pi.alternateAddresses().get(a).hostname();
            }
        }
        List nodeInfos = c.bucketConfigs().values().stream().flatMap(bc -> bc.nodes().stream()).collect(Collectors.toList());
        for (NodeInfo ni : nodeInfos) {
            if (!ni.hostname().equals(seed.address())) continue;
            alternatePorts.set(tls ? ni.alternateAddresses().get(a).sslServices() : ni.alternateAddresses().get(a).services());
            return ni.alternateAddresses().get(a).hostname();
        }
        return null;
    }

    @Override
    public Mono<Void> loadAndRefreshGlobalConfig() {
        return Mono.defer(() -> {
            if (!this.shutdown.get()) {
                this.globalConfigLoadInProgress = true;
                boolean tls = this.core.context().environment().securityConfig().tlsEnabled();
                return this.fetchGlobalConfigs(this.currentSeedNodes.get(), tls, false, true).switchIfEmpty(Mono.error(new ConfigException("Could not locate a single global configuration"))).map(ctx -> {
                    this.proposeGlobalConfig((ProposedGlobalConfigContext)ctx);
                    return ctx;
                }).then(this.globalRefresher.start()).doOnTerminate(() -> {
                    this.globalConfigLoadInProgress = false;
                });
            }
            return Mono.error(new AlreadyShutdownException());
        });
    }

    @Override
    public void proposeBucketConfig(ProposedBucketConfigContext ctx) {
        if (!this.shutdown.get()) {
            try {
                BucketConfig config = BucketConfigParser.parse(ctx.config(), this.core.context().environment(), ctx.origin());
                this.checkAndApplyConfig(config, ctx.forcesOverride());
            }
            catch (Exception ex) {
                this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.PARSE_FAILURE, Optional.of(ex), Optional.of(ctx.config()), Optional.of(ctx.bucketName())));
            }
        } else {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.ALREADY_SHUTDOWN, Optional.empty(), Optional.of(ctx.config()), Optional.of(ctx.bucketName())));
        }
    }

    @Override
    public void proposeGlobalConfig(ProposedGlobalConfigContext ctx) {
        if (!this.shutdown.get()) {
            try {
                GlobalConfig config = GlobalConfigParser.parse(ctx.config(), ctx.origin());
                this.checkAndApplyConfig(config, ctx.forcesOverride());
            }
            catch (Exception ex) {
                this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.PARSE_FAILURE, Optional.of(ex), Optional.of(ctx.config()), Optional.empty()));
            }
        } else {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.ALREADY_SHUTDOWN, Optional.empty(), Optional.of(ctx.config()), Optional.empty()));
        }
    }

    @Override
    public Mono<Void> closeBucket(String name) {
        return Mono.defer(() -> this.shutdown.get() ? Mono.error(new AlreadyShutdownException()) : this.closeBucketIgnoreShutdown(name));
    }

    private Mono<Void> closeBucketIgnoreShutdown(String name) {
        return Mono.defer(() -> {
            this.currentConfig.deleteBucketConfig(name);
            this.pushConfig();
            return Mono.empty();
        }).then(this.keyValueRefresher.deregister(name)).then(this.clusterManagerRefresher.deregister(name));
    }

    @Override
    public Mono<Void> shutdown() {
        return Mono.defer(() -> {
            if (this.shutdown.compareAndSet(false, true)) {
                return Flux.fromIterable(this.currentConfig.bucketConfigs().values()).flatMap(bucketConfig -> this.closeBucketIgnoreShutdown(bucketConfig.name())).then(Mono.defer(this::disableAndClearGlobalConfig)).doOnTerminate(() -> {
                    this.pushConfig();
                    this.configsSink.emitComplete(Reactor.emitFailureHandler());
                }).then(this.keyValueRefresher.shutdown()).then(this.clusterManagerRefresher.shutdown()).then(this.globalRefresher.shutdown());
            }
            return Mono.error(new AlreadyShutdownException());
        });
    }

    private Mono<Void> disableAndClearGlobalConfig() {
        return this.globalRefresher.stop().then(Mono.defer(() -> {
            this.currentConfig.deleteGlobalConfig();
            return Mono.empty();
        }));
    }

    @Override
    public synchronized void refreshCollectionId(CollectionIdentifier identifier) {
        if (this.collectionRefreshInProgress(identifier)) {
            this.eventBus.publish(new CollectionMapRefreshIgnoredEvent(this.core.context(), identifier));
            return;
        }
        this.collectionMapRefreshInProgress.add(identifier);
        NanoTimestamp start = NanoTimestamp.now();
        GetCollectionIdRequest request = new GetCollectionIdRequest(this.core.context().environment().timeoutConfig().kvTimeout(), this.core.context(), (RetryStrategy)BestEffortRetryStrategy.INSTANCE, identifier);
        this.core.send(request);
        request.response().whenComplete((response, throwable) -> {
            try {
                Duration duration = start.elapsed();
                if (throwable != null) {
                    this.eventBus.publish(new CollectionMapRefreshFailedEvent(duration, this.core.context(), identifier, (Throwable)throwable, CollectionMapRefreshFailedEvent.Reason.FAILED));
                    return;
                }
                if (response.status().success()) {
                    if (response.collectionId().isPresent()) {
                        long cid = response.collectionId().get();
                        this.collectionMap.put(identifier, UnsignedLEB128.encode(cid));
                        this.eventBus.publish(new CollectionMapRefreshSucceededEvent(duration, this.core.context(), identifier, cid));
                    } else {
                        this.eventBus.publish(new CollectionMapRefreshFailedEvent(duration, this.core.context(), identifier, null, CollectionMapRefreshFailedEvent.Reason.COLLECTION_ID_NOT_PRESENT));
                    }
                } else {
                    CollectionMapRefreshFailedEvent.Reason reason;
                    CouchbaseException cause = null;
                    if (response.status() == ResponseStatus.UNKNOWN || response.status() == ResponseStatus.NO_COLLECTIONS_MANIFEST) {
                        reason = CollectionMapRefreshFailedEvent.Reason.NOT_SUPPORTED;
                    } else if (response.status() == ResponseStatus.UNKNOWN_COLLECTION) {
                        reason = CollectionMapRefreshFailedEvent.Reason.UNKNOWN_COLLECTION;
                    } else if (response.status() == ResponseStatus.UNKNOWN_SCOPE) {
                        reason = CollectionMapRefreshFailedEvent.Reason.UNKNOWN_SCOPE;
                    } else if (response.status() == ResponseStatus.INVALID_REQUEST) {
                        reason = CollectionMapRefreshFailedEvent.Reason.INVALID_REQUEST;
                    } else {
                        cause = new CouchbaseException(response.toString());
                        reason = CollectionMapRefreshFailedEvent.Reason.UNKNOWN;
                    }
                    this.eventBus.publish(new CollectionMapRefreshFailedEvent(duration, this.core.context(), identifier, cause, reason));
                }
            }
            finally {
                this.collectionMapRefreshInProgress.remove(identifier);
            }
        });
    }

    @Override
    public boolean collectionRefreshInProgress() {
        return !this.collectionMapRefreshInProgress.isEmpty();
    }

    @Override
    public boolean collectionRefreshInProgress(CollectionIdentifier identifier) {
        return this.collectionMapRefreshInProgress.contains(identifier);
    }

    private void checkAndApplyConfig(BucketConfig newConfig, boolean force) {
        String name = newConfig.name();
        BucketConfig oldConfig = this.currentConfig.bucketConfig(name);
        if (!force && oldConfig != null && this.configIsOlderOrSame(oldConfig.revEpoch(), newConfig.revEpoch(), oldConfig.rev(), newConfig.rev())) {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.OLD_OR_SAME_REVISION, Optional.empty(), Optional.empty(), Optional.of(newConfig.name())));
            return;
        }
        if (newConfig.tainted()) {
            this.keyValueRefresher.markTainted(name);
            this.clusterManagerRefresher.markTainted(name);
        } else {
            this.keyValueRefresher.markUntainted(name);
            this.clusterManagerRefresher.markUntainted(name);
        }
        this.eventBus.publish(new BucketConfigUpdatedEvent(this.core.context(), newConfig));
        this.currentConfig.setBucketConfig(newConfig);
        this.checkAlternateAddress();
        this.updateSeedNodeList();
        this.pushConfig();
    }

    private void checkAndApplyConfig(GlobalConfig newConfig, boolean force) {
        GlobalConfig oldConfig = this.currentConfig.globalConfig();
        if (!force && oldConfig != null && this.configIsOlderOrSame(oldConfig.revEpoch(), newConfig.revEpoch(), oldConfig.rev(), newConfig.rev())) {
            this.eventBus.publish(new ConfigIgnoredEvent(this.core.context(), ConfigIgnoredEvent.Reason.OLD_OR_SAME_REVISION, Optional.empty(), Optional.empty(), Optional.empty()));
            return;
        }
        this.eventBus.publish(new GlobalConfigUpdatedEvent(this.core.context(), newConfig));
        this.currentConfig.setGlobalConfig(newConfig);
        this.checkAlternateAddress();
        this.updateSeedNodeList();
        this.pushConfig();
    }

    private boolean configIsOlderOrSame(long oldRevEpoch, long newRevEpoch, long oldRev, long newRev) {
        if (newRevEpoch < oldRevEpoch) {
            return true;
        }
        if (newRevEpoch > oldRevEpoch) {
            return false;
        }
        return newRev > 0L && newRev <= oldRev;
    }

    private void updateSeedNodeList() {
        ClusterConfig config = this.currentConfig;
        boolean tlsEnabled = this.core.context().environment().securityConfig().tlsEnabled();
        if (config.globalConfig() != null) {
            Set<SeedNode> seedNodes = Collections.unmodifiableSet(config.globalConfig().portInfos().stream().map(ni -> {
                Map<ServiceType, Integer> ports;
                Map<ServiceType, Integer> map = ports = tlsEnabled ? ni.sslPorts() : ni.ports();
                if (!ports.containsKey((Object)ServiceType.KV)) {
                    return null;
                }
                return SeedNode.create(ni.hostname(), Optional.ofNullable(ports.get((Object)ServiceType.KV)), Optional.ofNullable(ports.get((Object)ServiceType.MANAGER)));
            }).filter(Objects::nonNull).collect(Collectors.toSet()));
            if (!seedNodes.isEmpty()) {
                this.eventBus.publish(new SeedNodesUpdatedEvent(this.core.context(), this.currentSeedNodes(), seedNodes));
                this.setSeedNodes(seedNodes);
            }
            return;
        }
        Set<SeedNode> seedNodes = Collections.unmodifiableSet(config.bucketConfigs().values().stream().flatMap(bc -> bc.nodes().stream()).map(ni -> {
            Map<ServiceType, Integer> ports;
            Map<ServiceType, Integer> map = ports = tlsEnabled ? ni.sslServices() : ni.services();
            if (!ports.containsKey((Object)ServiceType.KV)) {
                return null;
            }
            return SeedNode.create(ni.hostname(), Optional.ofNullable(ports.get((Object)ServiceType.KV)), Optional.ofNullable(ports.get((Object)ServiceType.MANAGER)));
        }).filter(Objects::nonNull).collect(Collectors.toSet()));
        if (!seedNodes.isEmpty()) {
            this.eventBus.publish(new SeedNodesUpdatedEvent(this.core.context(), this.currentSeedNodes(), seedNodes));
            this.setSeedNodes(seedNodes);
        }
    }

    private synchronized void checkAlternateAddress() {
        if (this.alternateAddrChecked.compareAndSet(false, true)) {
            String resolved = DefaultConfigurationProvider.determineNetworkResolution(DefaultConfigurationProvider.extractAlternateAddressInfos(this.currentConfig), this.core.context().environment().ioConfig().networkResolution(), this.currentSeedNodes().stream().map(SeedNode::address).collect(Collectors.toSet()));
            this.core.context().alternateAddress(Optional.ofNullable(resolved));
        }
    }

    public static List<AlternateAddressHolder> extractAlternateAddressInfos(ClusterConfig config) {
        Stream<AlternateAddressHolder> holders = config.globalConfig() != null ? config.globalConfig().portInfos().stream().map(pi -> new AlternateAddressHolder(pi.hostname(), pi.alternateAddresses())) : config.bucketConfigs().values().stream().flatMap(bc -> bc.nodes().stream()).map(ni -> new AlternateAddressHolder(ni.hostname(), ni.alternateAddresses()));
        return holders.collect(Collectors.toList());
    }

    public static String determineNetworkResolution(List<AlternateAddressHolder> nodes, NetworkResolution nr, Set<String> seedHosts) {
        if (nr.equals(NetworkResolution.DEFAULT)) {
            return null;
        }
        if (nr.equals(NetworkResolution.AUTO)) {
            for (AlternateAddressHolder info : nodes) {
                if (seedHosts.contains(info.hostname())) {
                    return null;
                }
                Map<String, AlternateAddress> aa = info.alternateAddresses();
                if (aa == null || aa.isEmpty()) continue;
                for (Map.Entry<String, AlternateAddress> entry : aa.entrySet()) {
                    AlternateAddress alternateAddress = entry.getValue();
                    if (alternateAddress == null || !seedHosts.contains(alternateAddress.hostname())) continue;
                    return entry.getKey();
                }
            }
            return null;
        }
        return nr.name();
    }

    private synchronized void pushConfig() {
        Sinks.EmitResult emitResult = this.configsSink.tryEmitNext(this.currentConfig);
        if (emitResult != Sinks.EmitResult.OK) {
            this.eventBus.publish(new ConfigPushFailedEvent(this.core.context(), emitResult));
        }
    }

    protected Mono<Void> registerRefresher(String bucket) {
        return Mono.defer(() -> {
            BucketConfig config = this.currentConfig.bucketConfig(bucket);
            if (config == null) {
                return Mono.error(new CouchbaseException("Bucket for registration does not exist, this is an error! Please report"));
            }
            if (config instanceof CouchbaseBucketConfig) {
                return this.keyValueRefresher.register(bucket);
            }
            return this.clusterManagerRefresher.register(bucket);
        });
    }

    @Override
    public boolean globalConfigLoadInProgress() {
        return this.globalConfigLoadInProgress;
    }

    @Override
    public boolean bucketConfigLoadInProgress() {
        return this.bucketConfigLoadInProgress.get() > 0;
    }

    @Override
    public void signalConfigRefreshFailed(ConfigRefreshFailure failure) {
        if (failure == ConfigRefreshFailure.ALL_NODES_TRIED_ONCE_WITHOUT_SUCCESS) {
            this.handlePotentialDnsSrvRefresh();
        }
    }

    private synchronized void handlePotentialDnsSrvRefresh() {
        boolean refreshAllowed;
        CoreContext ctx = this.core.context();
        CoreEnvironment env = ctx.environment();
        boolean isValidDnsSrv = this.connectionString != null && this.connectionString.isValidDnsSrv() && env.ioConfig().dnsSrvEnabled();
        boolean tlsEnabled = env.securityConfig().tlsEnabled();
        boolean enoughTimeElapsed = this.lastDnsSrvLookup.hasElapsed(MIN_TIME_BETWEEN_DNS_LOOKUPS);
        boolean bl = refreshAllowed = isValidDnsSrv && enoughTimeElapsed;
        if (refreshAllowed) {
            this.lastDnsSrvLookup = NanoTimestamp.now();
            NanoTimestamp started = NanoTimestamp.now();
            Schedulers.boundedElastic().schedule(() -> {
                try {
                    List<String> foundNodes = this.performDnsSrvLookup(tlsEnabled);
                    if (foundNodes.isEmpty()) {
                        env.eventBus().publish(new DnsSrvRefreshAttemptFailedEvent(started.elapsed(), ctx, DnsSrvRefreshAttemptFailedEvent.Reason.NO_NEW_SEEDS_RETURNED, null));
                        return;
                    }
                    Set<SeedNode> seedNodes = foundNodes.stream().map(SeedNode::create).collect(Collectors.toSet());
                    ProposedGlobalConfigContext foundGlobalConfig = this.fetchGlobalConfigs(seedNodes, tlsEnabled, true, false).block();
                    if (foundGlobalConfig != null) {
                        this.proposeGlobalConfig(foundGlobalConfig.forceOverride());
                    }
                    for (String name : this.keyValueRefresher.registered()) {
                        ProposedBucketConfigContext bucketConfig = this.fetchBucketConfigs(name, seedNodes, tlsEnabled).block();
                        if (bucketConfig == null) continue;
                        this.proposeBucketConfig(bucketConfig.forceOverride());
                    }
                    env.eventBus().publish(new DnsSrvRefreshAttemptCompletedEvent(started.elapsed(), ctx, foundNodes));
                }
                catch (Exception e) {
                    env.eventBus().publish(new DnsSrvRefreshAttemptFailedEvent(started.elapsed(), ctx, DnsSrvRefreshAttemptFailedEvent.Reason.OTHER, e));
                }
            });
        }
    }

    protected List<String> performDnsSrvLookup(boolean tlsEnabled) throws NamingException {
        return ConnectionStringUtil.fromDnsSrvOrThrowIfTlsRequired(this.connectionString.hosts().get(0).hostname(), tlsEnabled);
    }

    private Mono<ProposedBucketConfigContext> fetchBucketConfigs(String name, Set<SeedNode> seedNodes, boolean tls) {
        int kvPort = tls ? 11207 : 11210;
        int managerPort = tls ? 18091 : 8091;
        Optional<String> alternate = this.core.context().alternateAddress();
        return Flux.range(1, Math.min(5, seedNodes.size())).flatMap(index -> Flux.fromIterable(seedNodes).take(Math.min(index, seedNodes.size())).last().flatMap(seed -> {
            int mappedManagerPort;
            int mappedKvPort;
            NodeIdentifier identifier = new NodeIdentifier(seed.address(), seed.clusterManagerPort().orElse(8091));
            AtomicReference alternatePorts = new AtomicReference();
            Optional<String> alternateAddress = alternate.map(a -> this.mapAlternateAddress((String)a, (SeedNode)seed, tls, alternatePorts));
            if (alternateAddress.isPresent()) {
                Map ports = (Map)alternatePorts.get();
                mappedKvPort = (Integer)ports.get((Object)ServiceType.KV);
                mappedManagerPort = (Integer)ports.get((Object)ServiceType.MANAGER);
            } else {
                mappedKvPort = seed.kvPort().orElse(kvPort);
                mappedManagerPort = seed.clusterManagerPort().orElse(managerPort);
            }
            return this.loadBucketConfigForSeed(identifier, mappedKvPort, mappedManagerPort, name, alternateAddress);
        }).retryWhen(Retry.from(companion -> companion.flatMap(rs -> {
            Throwable f = rs.failure();
            if (this.shutdown.get()) {
                return Mono.error(new AlreadyShutdownException());
            }
            if (f instanceof UnsupportedConfigMechanismException) {
                return Mono.error(Exceptions.propagate(f));
            }
            boolean bucketNotFound = f instanceof BucketNotFoundDuringLoadException;
            boolean bucketNotReady = f instanceof BucketNotReadyDuringLoadException;
            Duration delay = bucketNotFound || bucketNotReady ? Duration.ofMillis(500L) : Duration.ofMillis(1L);
            this.eventBus.publish(new BucketOpenRetriedEvent(name, delay, this.core.context(), f));
            return Mono.just(rs.totalRetries()).delayElement(delay, this.core.context().environment().scheduler());
        })))).next();
    }

    private Mono<ProposedGlobalConfigContext> fetchGlobalConfigs(Set<SeedNode> seedNodes, boolean tls, boolean allowStaleSeeds, boolean retryTimeouts) {
        AtomicBoolean hasErrored = new AtomicBoolean();
        int kvPort = tls ? 11207 : 11210;
        return Flux.range(1, Math.min(5, seedNodes.size())).flatMap(index -> Flux.fromIterable(seedNodes).take(Math.min(index, seedNodes.size())).last().flatMap(seed -> {
            NanoTimestamp start = NanoTimestamp.now();
            if (!allowStaleSeeds && !this.currentSeedNodes().contains(seed)) {
                return Mono.empty();
            }
            NodeIdentifier identifier = new NodeIdentifier(seed.address(), seed.clusterManagerPort().orElse(8091));
            return this.globalLoader.load(identifier, seed.kvPort().orElse(kvPort)).doOnError(throwable -> this.core.context().environment().eventBus().publish(new IndividualGlobalConfigLoadFailedEvent(start.elapsed(), this.core.context(), (Throwable)throwable, seed.address())));
        }).retryWhen(Retry.from(companion -> companion.flatMap(rs -> {
            Throwable f = rs.failure();
            if (this.shutdown.get()) {
                return Mono.error(new AlreadyShutdownException());
            }
            if (f instanceof UnsupportedConfigMechanismException) {
                return Mono.error(Exceptions.propagate(f));
            }
            if (!retryTimeouts && f.getCause() instanceof TimeoutException) {
                return Mono.error(f.getCause());
            }
            Duration delay = Duration.ofMillis(1L);
            this.eventBus.publish(new GlobalConfigRetriedEvent(delay, this.core.context(), f));
            return Mono.just(rs.totalRetries()).delayElement(delay, this.core.context().environment().scheduler());
        }))).onErrorResume(throwable -> {
            if (hasErrored.compareAndSet(false, true)) {
                return Mono.error(throwable);
            }
            return Mono.empty();
        })).next();
    }

    Set<SeedNode> currentSeedNodes() {
        return this.currentSeedNodes.get();
    }

    private synchronized void setSeedNodes(Set<SeedNode> seedNodes) {
        this.currentSeedNodes.set(seedNodes);
        Sinks.EmitResult emitResult = this.seedNodesSink.tryEmitNext(seedNodes);
        if (emitResult != Sinks.EmitResult.OK) {
            this.eventBus.publish(new SeedNodesUpdateFailedEvent(this.core.context(), emitResult));
        }
    }

    public static class AlternateAddressHolder {
        private final String hostname;
        private final Map<String, AlternateAddress> alternateAddresses;

        AlternateAddressHolder(String hostname, Map<String, AlternateAddress> alternateAddresses) {
            this.hostname = hostname;
            this.alternateAddresses = alternateAddresses;
        }

        public String hostname() {
            return this.hostname;
        }

        public Map<String, AlternateAddress> alternateAddresses() {
            return this.alternateAddresses;
        }
    }
}

