package com.couchbase.client.core.config.refresher;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.config.BucketConfigRefreshFailedEvent;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.ConfigRefreshFailure;
import com.couchbase.client.core.config.ConfigVersion;
import com.couchbase.client.core.config.ConfigurationProvider;
import com.couchbase.client.core.config.NodeInfo;
import com.couchbase.client.core.config.ProposedBucketConfigContext;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.kv.CarrierBucketConfigRequest;
import com.couchbase.client.core.retry.FailFastRetryStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.NanoTimestamp;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/config/refresher/KeyValueBucketRefresher.class */
public class KeyValueBucketRefresher implements BucketRefresher {
    static final Duration POLLER_INTERVAL = Duration.ofSeconds(1);
    static final int MAX_PARALLEL_FETCH = 1;
    private final Core core;
    private final Disposable pollRegistration;
    private final ConfigurationProvider provider;
    private final AtomicLong nodeOffset = new AtomicLong(0);
    private final Map<String, NanoTimestamp> registrations = new ConcurrentHashMap();
    private final Map<String, AtomicInteger> numFailedRefreshes = new ConcurrentHashMap();
    private final Set<String> tainted = ConcurrentHashMap.newKeySet();
    private final Duration configPollInterval;
    private final Duration configRequestTimeout;
    private final EventBus eventBus;

    public KeyValueBucketRefresher(ConfigurationProvider configurationProvider, Core core) {
        this.core = core;
        this.eventBus = core.context().environment().eventBus();
        this.provider = configurationProvider;
        this.configPollInterval = core.context().environment().ioConfig().configPollInterval();
        this.configRequestTimeout = clampConfigRequestTimeout(this.configPollInterval);
        Flux flatMap = Flux.merge(Flux.interval(pollerInterval(), core.context().environment().scheduler()), configurationProvider.configChangeNotifications()).onBackpressureDrop().filter(l -> {
            return !this.registrations.isEmpty();
        }).flatMap(l2 -> {
            return Flux.fromIterable(this.registrations.keySet()).flatMap(str -> {
                return maybeUpdateBucket(str, l2.longValue() == -1);
            });
        });
        Objects.requireNonNull(configurationProvider);
        this.pollRegistration = flatMap.subscribe(configurationProvider::proposeBucketConfig);
    }

    protected Duration pollerInterval() {
        return POLLER_INTERVAL;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Duration clampConfigRequestTimeout(Duration duration) {
        return constrainToRange(duration, Duration.ofSeconds(1L), Duration.ofSeconds(5L));
    }

    private static Duration constrainToRange(Duration duration, Duration duration2, Duration duration3) {
        if (duration2.compareTo(duration3) > 0) {
            throw new IllegalArgumentException("min duration " + duration2 + " must be <= max duration " + duration3);
        }
        return duration.compareTo(duration2) < 0 ? duration2 : duration.compareTo(duration3) > 0 ? duration3 : duration;
    }

    private Mono<ProposedBucketConfigContext> maybeUpdateBucket(String str, boolean z) {
        NanoTimestamp nanoTimestamp = this.registrations.get(str);
        if (!(z || this.tainted.contains(str) || (nanoTimestamp != null && nanoTimestamp.hasElapsed(this.configPollInterval)))) {
            return Mono.empty();
        }
        List<NodeInfo> filterEligibleNodes = filterEligibleNodes(str);
        if (this.numFailedRefreshes.get(str).get() >= filterEligibleNodes.size()) {
            this.provider.signalConfigRefreshFailed(ConfigRefreshFailure.ALL_NODES_TRIED_ONCE_WITHOUT_SUCCESS);
            this.numFailedRefreshes.get(str).set(0);
        }
        return fetchConfigPerNode(str, Flux.fromIterable(filterEligibleNodes).take(1L)).next().doOnSuccess(proposedBucketConfigContext -> {
            this.registrations.replace(str, NanoTimestamp.now());
        });
    }

    private List<NodeInfo> filterEligibleNodes(String str) {
        BucketConfig bucketConfig = this.provider.config().bucketConfig(str);
        if (bucketConfig == null) {
            this.eventBus.publish(new BucketConfigRefreshFailedEvent(this.core.context(), BucketConfigRefreshFailedEvent.RefresherType.KV, BucketConfigRefreshFailedEvent.Reason.NO_BUCKET_FOUND, Optional.empty()));
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(bucketConfig.nodes());
        shiftNodeList(arrayList);
        return (List) arrayList.stream().filter(nodeInfo -> {
            return nodeInfo.services().containsKey(ServiceType.KV) || nodeInfo.sslServices().containsKey(ServiceType.KV);
        }).collect(Collectors.toList());
    }

    private Flux<ProposedBucketConfigContext> fetchConfigPerNode(String str, Flux<NodeInfo> flux) {
        return flux.flatMap(nodeInfo -> {
            CarrierBucketConfigRequest carrierBucketConfigRequest = new CarrierBucketConfigRequest(this.configRequestTimeout, this.core.context(), new CollectionIdentifier(str, Optional.empty(), Optional.empty()), FailFastRetryStrategy.INSTANCE, nodeInfo.identifier(), currentVersion(str));
            this.core.send(carrierBucketConfigRequest);
            return Reactor.wrap(carrierBucketConfigRequest, carrierBucketConfigRequest.response(), true).filter(carrierBucketConfigResponse -> {
                if (!carrierBucketConfigResponse.status().success()) {
                    this.numFailedRefreshes.get(str).incrementAndGet();
                    this.eventBus.publish(new BucketConfigRefreshFailedEvent(this.core.context(), BucketConfigRefreshFailedEvent.RefresherType.KV, BucketConfigRefreshFailedEvent.Reason.INDIVIDUAL_REQUEST_FAILED, Optional.of(carrierBucketConfigResponse)));
                }
                return carrierBucketConfigResponse.status().success();
            }).map(carrierBucketConfigResponse2 -> {
                return new ProposedBucketConfigContext(str, new String(carrierBucketConfigResponse2.content(), StandardCharsets.UTF_8), nodeInfo.hostname());
            }).doOnSuccess(proposedBucketConfigContext -> {
                this.numFailedRefreshes.get(str).set(0);
            }).onErrorResume(th -> {
                this.numFailedRefreshes.get(str).incrementAndGet();
                this.eventBus.publish(new BucketConfigRefreshFailedEvent(this.core.context(), BucketConfigRefreshFailedEvent.RefresherType.KV, BucketConfigRefreshFailedEvent.Reason.INDIVIDUAL_REQUEST_FAILED, Optional.of(th)));
                return Mono.empty();
            });
        });
    }

    private ConfigVersion currentVersion(String str) {
        BucketConfig bucketConfig = this.provider.config().bucketConfig(str);
        return bucketConfig == null ? ConfigVersion.ZERO : bucketConfig.version();
    }

    private <T> void shiftNodeList(List<T> list) {
        Collections.rotate(list, -((int) (this.nodeOffset.getAndIncrement() % list.size())));
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public Mono<Void> register(String str) {
        return Mono.defer(() -> {
            this.registrations.put(str, NanoTimestamp.never());
            this.numFailedRefreshes.put(str, new AtomicInteger(0));
            return Mono.empty();
        });
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public Mono<Void> deregister(String str) {
        return Mono.defer(() -> {
            this.registrations.remove(str);
            this.numFailedRefreshes.remove(str);
            return Mono.empty();
        });
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public void markTainted(String str) {
        this.tainted.add(str);
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public void markUntainted(String str) {
        this.tainted.remove(str);
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public Mono<Void> shutdown() {
        return Mono.defer(() -> {
            if (!this.pollRegistration.isDisposed()) {
                this.pollRegistration.dispose();
            }
            this.numFailedRefreshes.clear();
            this.registrations.clear();
            this.tainted.clear();
            return Mono.empty();
        });
    }

    @Override // com.couchbase.client.core.config.refresher.BucketRefresher
    public Set<String> registered() {
        return this.registrations.keySet();
    }
}
