/*
 * Decompiled with CFR 0.152.
 */
package com.couchbase.client.core.diagnostics;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.AbstractEvent;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.events.core.WaitUntilReadyCompletedEvent;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ArrayNode;
import com.couchbase.client.core.deps.com.fasterxml.jackson.databind.node.ObjectNode;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.DefaultFullHttpRequest;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpMethod;
import com.couchbase.client.core.deps.io.netty.handler.codec.http.HttpVersion;
import com.couchbase.client.core.diagnostics.ClusterState;
import com.couchbase.client.core.diagnostics.EndpointDiagnostics;
import com.couchbase.client.core.diagnostics.HealthPinger;
import com.couchbase.client.core.diagnostics.PingState;
import com.couchbase.client.core.diagnostics.WaitUntilReadyContext;
import com.couchbase.client.core.endpoint.http.CoreCommonOptions;
import com.couchbase.client.core.endpoint.http.CoreHttpPath;
import com.couchbase.client.core.error.UnambiguousTimeoutException;
import com.couchbase.client.core.error.context.CancellationErrorContext;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.msg.RequestTarget;
import com.couchbase.client.core.msg.ResponseStatus;
import com.couchbase.client.core.msg.manager.GenericManagerRequest;
import com.couchbase.client.core.retry.FailFastRetryStrategy;
import com.couchbase.client.core.service.ServiceType;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.CbThrowables;
import com.couchbase.client.core.util.Deadline;
import com.couchbase.client.core.util.NanoTimestamp;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.retry.Retry;
import reactor.util.retry.RetryBackoffSpec;

@Stability.Internal
public class WaitUntilReadyHelper {
    private static RetryBackoffSpec retryWithMaxBackoff(Duration maxBackoff) {
        return Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(10L)).maxBackoff(maxBackoff).jitter(0.5);
    }

    private static <T> Mono<T> retryUntilReady(RetryBackoffSpec retrySpec, String stageName, WaitUntilReadyLogger log, Mono<T> waitUntilReadyStage) {
        return waitUntilReadyStage.retryWhen(retrySpec.filter(t -> {
            if (t instanceof NotReadyYetException) {
                log.waitingBecause(t.getMessage());
            } else {
                log.message("Unexpected exception while waiting for " + stageName + ": " + CbThrowables.getStackTraceAsString(t));
            }
            return true;
        }));
    }

    private static Mono<Void> waitForConfig(Core core, @Nullable String bucketName, WaitUntilReadyLogger log) {
        Mono stage = Mono.fromRunnable(() -> {
            if (core.configurationProvider().globalConfigLoadInProgress()) {
                throw new NotReadyYetException("global config load is in progress");
            }
            if (core.configurationProvider().bucketConfigLoadInProgress()) {
                throw new NotReadyYetException("bucket config load is in progress");
            }
            if (bucketName != null && core.configurationProvider().collectionRefreshInProgress()) {
                throw new NotReadyYetException("collection refresh is in progress for bucket " + bucketName);
            }
            if (bucketName != null && core.clusterConfig().bucketConfig(bucketName) == null) {
                throw new NotReadyYetException("cluster config does not yet have config for bucket " + bucketName);
            }
        });
        return WaitUntilReadyHelper.retryUntilReady(WaitUntilReadyHelper.retryWithMaxBackoff(Duration.ofMillis(100L)), "config load", log, stage);
    }

    private static Mono<Void> waitForNodeHealth(Core core, @Nullable String bucketName, WaitUntilReadyLogger log) {
        if (bucketName == null) {
            return Mono.fromRunnable(() -> log.message("Skipping node health check because no bucket name was specified."));
        }
        Mono stage = Mono.defer(() -> {
            String httpPath = CoreHttpPath.formatPath("/pools/default/buckets/{}", bucketName);
            GenericManagerRequest request = new GenericManagerRequest(core.context(), () -> new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, httpPath), true, null);
            log.message("Sending manager request to check bucket health; httpPath=" + httpPath);
            core.send(request);
            return Reactor.wrap(request, request.response(), true).flatMap(response -> {
                if (response.status() != ResponseStatus.SUCCESS) {
                    return Mono.error(new NotReadyYetException("Manager request to check bucket health failed with response status " + (Object)((Object)response.status()) + "; httpStatusCode=" + response.httpStatus() + ", responseBody=" + new String(response.content(), StandardCharsets.UTF_8) + ", requestContext=" + request.context()));
                }
                ObjectNode root = (ObjectNode)Mapper.decodeIntoTree(response.content());
                ArrayNode nodes = (ArrayNode)root.get("nodes");
                long healthy = StreamSupport.stream(nodes.spliterator(), false).filter(node -> node.get("status").asText().equals("healthy")).count();
                if ((long)nodes.size() != healthy) {
                    return Mono.error(new NotReadyYetException(healthy + " of " + nodes.size() + " nodes are healthy"));
                }
                log.message("All " + healthy + " nodes are healthy");
                return Mono.empty();
            });
        });
        return WaitUntilReadyHelper.retryUntilReady(WaitUntilReadyHelper.retryWithMaxBackoff(Duration.ofSeconds(2L)), "checking bucket health", log, stage);
    }

    public static CompletableFuture<Void> waitUntilReady(Core core, @Nullable Set<ServiceType> serviceTypes, Duration timeout, ClusterState desiredState, Optional<String> bucketName) {
        WaitUntilReadyLogger log = WaitUntilReadyLogger.create(core.environment().eventBus());
        log.message("Starting WaitUntilReady. serviceTypes=" + serviceTypes + ", timeout=" + timeout + ", desiredState=" + (Object)((Object)desiredState) + ", bucketName=" + bucketName);
        Deadline pingDeadline = Deadline.of(timeout, 0.9);
        WaitUntilReadyState state = new WaitUntilReadyState(log);
        AtomicReference<Set<ServiceType>> servicesToCheck = new AtomicReference<Set<ServiceType>>(serviceTypes == null ? Collections.emptySet() : CbCollections.setCopyOf(serviceTypes));
        ConcurrentHashMap.KeySetView remainingPingTargets = ConcurrentHashMap.newKeySet();
        return Mono.empty().then(Mono.fromRunnable(() -> state.transition(WaitUntilReadyStage.WAIT_FOR_CONFIG))).then(WaitUntilReadyHelper.waitForConfig(core, bucketName.orElse(null), log)).then(Mono.fromRunnable(() -> state.transition(WaitUntilReadyStage.WAIT_FOR_HEALTHY_NODES))).then(WaitUntilReadyHelper.waitForNodeHealth(core, bucketName.orElse(null), log)).then(Mono.fromRunnable(() -> state.transition(WaitUntilReadyStage.WAIT_FOR_SUCCESSFUL_PING))).then(WaitUntilReadyHelper.waitForSuccessfulPing(core, bucketName.orElse(null), desiredState, servicesToCheck, pingDeadline, remainingPingTargets, log)).timeout(timeout, Mono.defer(() -> {
            log.message("WaitUntilReady timed out :-(");
            WaitUntilReadyContext waitUntilReadyContext = new WaitUntilReadyContext((Set)servicesToCheck.get(), timeout, desiredState, bucketName, core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)), state, CbCollections.setCopyOf(remainingPingTargets));
            CancellationErrorContext errorContext = new CancellationErrorContext(waitUntilReadyContext);
            return Mono.error(new UnambiguousTimeoutException("WaitUntilReady timed out in stage " + (Object)((Object)state.currentStage) + " (spent " + state.currentStart.elapsed() + " in that stage)", errorContext));
        }), core.context().environment().scheduler()).doOnSuccess(completionReason -> {
            state.transition(WaitUntilReadyStage.COMPLETE);
            WaitUntilReadyContext waitUntilReadyContext = new WaitUntilReadyContext((Set)servicesToCheck.get(), timeout, desiredState, bucketName, core.diagnostics().collect(Collectors.groupingBy(EndpointDiagnostics::type)), state, Collections.emptySet());
            core.context().environment().eventBus().publish(new WaitUntilReadyCompletedEvent(waitUntilReadyContext, (WaitUntilReadyCompletedEvent.Reason)((Object)completionReason)));
        }).then().toFuture();
    }

    private static Mono<WaitUntilReadyCompletedEvent.Reason> waitForSuccessfulPing(Core core, @Nullable String bucketName, ClusterState desiredState, AtomicReference<Set<ServiceType>> serviceTypes, Deadline deadline, Set<RequestTarget> remainingPingTargets, WaitUntilReadyLogger log) {
        return Mono.defer(() -> {
            if (bucketName == null && !core.clusterConfig().hasClusterOrBucketConfig()) {
                log.message("cluster.waitUntilReady() completed without action, because it was run against a Couchbase Server version which does not support it (only supported with 6.5 and later). Please open at least one bucket, and call bucket.waitUntilReady() instead.");
                return Mono.just(WaitUntilReadyCompletedEvent.Reason.CLUSTER_LEVEL_NOT_SUPPORTED);
            }
            Optional<String> maybeBucketName = Optional.ofNullable(bucketName);
            Set<RequestTarget> initialPingTargets = CbCollections.setCopyOf(HealthPinger.extractPingTargets(core.clusterConfig(), (Set)serviceTypes.get(), maybeBucketName, log));
            remainingPingTargets.addAll(initialPingTargets);
            serviceTypes.set(initialPingTargets.stream().map(RequestTarget::serviceType).collect(Collectors.toSet()));
            ConcurrentHashMap.KeySetView offline = ConcurrentHashMap.newKeySet();
            offline.addAll((Collection)serviceTypes.get());
            return Flux.fromIterable(initialPingTargets).flatMap(target -> {
                Mono ping = HealthPinger.pingTarget(core, target, CoreCommonOptions.of(deadline.remaining().orElse(Duration.ofSeconds(10L)), FailFastRetryStrategy.INSTANCE, null), log).flatMap(report -> {
                    if (report.state() == PingState.OK) {
                        remainingPingTargets.remove(target);
                        return Mono.just(report);
                    }
                    Set<RequestTarget> currentPingTargets = HealthPinger.extractPingTargets(core.clusterConfig(), (Set)serviceTypes.get(), maybeBucketName, WaitUntilReadyLogger.dummy);
                    if (!currentPingTargets.contains(target)) {
                        log.message("Ignoring ping target " + target + " because it's no longer part of the cluster.");
                        remainingPingTargets.remove(target);
                        return Mono.empty();
                    }
                    return Mono.error(new NotReadyYetException("ping for target " + target + " failed with status: " + (Object)((Object)report.state())));
                });
                return WaitUntilReadyHelper.retryUntilReady(WaitUntilReadyHelper.retryWithMaxBackoff(Duration.ofSeconds(1L)), "ping " + target, log, ping);
            }).takeUntil(report -> {
                if (desiredState == ClusterState.ONLINE) {
                    return false;
                }
                boolean firstSuccessfulPingForService = offline.remove((Object)report.type());
                if (firstSuccessfulPingForService) {
                    log.message("At least one " + (Object)((Object)report.type()) + " ping was successful.");
                }
                if (offline.isEmpty()) {
                    log.message("At least one ping was successful for each awaited service; desired cluster state 'DEGRADED' is now satisfied.");
                    return true;
                }
                return false;
            }).then(Mono.just(WaitUntilReadyCompletedEvent.Reason.SUCCESS));
        });
    }

    @Stability.Internal
    public static interface WaitUntilReadyLogger {
        public static final WaitUntilReadyLogger dummy = new WaitUntilReadyLogger(){};

        public static WaitUntilReadyLogger create(final EventBus eventBus) {
            final String id = UUID.randomUUID().toString();
            return new WaitUntilReadyLogger(){

                @Override
                public void message(String message) {
                    message = id + " " + message;
                    if (EventBus.PublishResult.SUCCESS != eventBus.publish(new WaitUntilReadyDiagnostic(message))) {
                        System.err.println("[WaitUntilReadyDiagnostic] " + message);
                    }
                }
            };
        }

        default public void message(String message) {
        }

        default public void waitingBecause(String message) {
            this.message("Waiting because " + message);
        }
    }

    @Stability.Internal
    public static class WaitUntilReadyState {
        private final Map<WaitUntilReadyStage, Long> timings = new ConcurrentHashMap<WaitUntilReadyStage, Long>();
        private final AtomicLong totalDuration = new AtomicLong();
        private volatile WaitUntilReadyStage currentStage = WaitUntilReadyStage.INITIAL;
        private volatile NanoTimestamp currentStart = NanoTimestamp.now();
        private final WaitUntilReadyLogger log;

        public WaitUntilReadyState(WaitUntilReadyLogger log) {
            this.log = Objects.requireNonNull(log);
        }

        void transition(WaitUntilReadyStage next) {
            long timing = this.currentStart.elapsed().toMillis();
            if (this.currentStage != WaitUntilReadyStage.INITIAL) {
                this.timings.put(this.currentStage, timing);
                this.log.message("Stage '" + (Object)((Object)this.currentStage) + "' took " + this.currentStart.elapsed());
            }
            this.totalDuration.addAndGet(timing);
            this.log.message("Transitioning from stage " + (Object)((Object)this.currentStage) + " to stage " + (Object)((Object)next) + ". Total elapsed time since waiting started: " + Duration.ofMillis(this.totalDuration.get()));
            this.currentStage = next;
            this.currentStart = NanoTimestamp.now();
        }

        public Map<String, Object> export() {
            TreeMap<String, Object> toExport = new TreeMap<String, Object>();
            toExport.put("current_stage", (Object)this.currentStage);
            if (this.currentStage != WaitUntilReadyStage.COMPLETE) {
                long currentMs = this.currentStart.elapsed().toMillis();
                toExport.put("current_stage_since_ms", currentMs);
                toExport.put("total_ms", this.totalDuration.get() + currentMs);
            } else {
                toExport.put("total_ms", this.totalDuration.get());
            }
            toExport.put("timings_ms", new TreeMap<WaitUntilReadyStage, Long>(this.timings));
            return toExport;
        }
    }

    private static class NotReadyYetException
    extends RuntimeException {
        public NotReadyYetException(String message) {
            super(message);
        }
    }

    private static enum WaitUntilReadyStage {
        INITIAL,
        WAIT_FOR_CONFIG,
        WAIT_FOR_HEALTHY_NODES,
        WAIT_FOR_SUCCESSFUL_PING,
        COMPLETE;

    }

    private static class WaitUntilReadyDiagnostic
    extends AbstractEvent {
        private final String message;

        protected WaitUntilReadyDiagnostic(String message) {
            super(Event.Severity.DEBUG, Event.Category.CORE.path() + ".WaitUntilReady", Duration.ZERO, null);
            this.message = Objects.requireNonNull(message);
        }

        @Override
        public String description() {
            return this.message;
        }
    }
}

