package com.couchbase.client.core.service.kv;

import com.couchbase.client.core.Core;
import com.couchbase.client.core.CoreContext;
import com.couchbase.client.core.Reactor;
import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.RequestSpan;
import com.couchbase.client.core.cnc.TracingIdentifiers;
import com.couchbase.client.core.cnc.events.request.IndividualReplicaGetFailedEvent;
import com.couchbase.client.core.config.BucketConfig;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.env.CoreEnvironment;
import com.couchbase.client.core.error.CommonExceptions;
import com.couchbase.client.core.error.CouchbaseException;
import com.couchbase.client.core.error.DefaultErrorUtil;
import com.couchbase.client.core.error.DocumentUnretrievableException;
import com.couchbase.client.core.error.context.AggregateErrorContext;
import com.couchbase.client.core.error.context.ErrorContext;
import com.couchbase.client.core.error.context.ReducedKeyValueErrorContext;
import com.couchbase.client.core.io.CollectionIdentifier;
import com.couchbase.client.core.msg.kv.GetRequest;
import com.couchbase.client.core.msg.kv.GetResponse;
import com.couchbase.client.core.msg.kv.ReplicaGetRequest;
import com.couchbase.client.core.retry.RetryStrategy;
import com.couchbase.client.core.util.Validators;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Stability.Internal
/* loaded from: input_file:com/couchbase/client/core/service/kv/ReplicaHelper.class */
public class ReplicaHelper {

    /* loaded from: input_file:com/couchbase/client/core/service/kv/ReplicaHelper$GetReplicaResponse.class */
    public static class GetReplicaResponse {
        private final GetResponse response;
        private final boolean fromReplica;

        public GetReplicaResponse(GetResponse getResponse, boolean z) {
            this.response = (GetResponse) Objects.requireNonNull(getResponse);
            this.fromReplica = z;
        }

        public boolean isFromReplica() {
            return this.fromReplica;
        }

        public GetResponse getResponse() {
            return this.response;
        }
    }

    private ReplicaHelper() {
        throw new AssertionError("not instantiable");
    }

    public static Mono<GetReplicaResponse> getAnyReplicaReactive(Core core, CollectionIdentifier collectionIdentifier, String str, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan) {
        RequestSpan requestSpan2 = core.context().environment().requestTracer().requestSpan(TracingIdentifiers.SPAN_GET_ANY_REPLICA, requestSpan);
        return getAllReplicasReactive(core, collectionIdentifier, str, duration, retryStrategy, map, requestSpan2).next().doFinally(signalType -> {
            requestSpan2.end();
        });
    }

    public static Flux<GetReplicaResponse> getAllReplicasReactive(Core core, CollectionIdentifier collectionIdentifier, String str, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan) {
        Validators.notNullOrEmpty(str, "Id", (Supplier<ErrorContext>) () -> {
            return ReducedKeyValueErrorContext.create(str, collectionIdentifier);
        });
        CoreEnvironment environment = core.context().environment();
        RequestSpan requestSpan2 = environment.requestTracer().requestSpan(TracingIdentifiers.SPAN_GET_ALL_REPLICAS, requestSpan);
        requestSpan2.attribute(TracingIdentifiers.ATTR_SYSTEM, "couchbase");
        return Reactor.toMono(() -> {
            return getAllReplicasRequests(core, collectionIdentifier, str, map, retryStrategy, duration, requestSpan2);
        }).flux().flatMap(Flux::fromStream).flatMap(getRequest -> {
            return Reactor.wrap(getRequest, get(core, getRequest), true).onErrorResume(th -> {
                environment.eventBus().publish(new IndividualReplicaGetFailedEvent(getRequest.context()));
                return Mono.empty();
            }).map(getResponse -> {
                return new GetReplicaResponse(getResponse, getRequest instanceof ReplicaGetRequest);
            });
        }).doFinally(signalType -> {
            requestSpan2.end();
        });
    }

    public static <R> CompletableFuture<List<CompletableFuture<R>>> getAllReplicasAsync(Core core, CollectionIdentifier collectionIdentifier, String str, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan, Function<GetReplicaResponse, R> function) {
        RequestSpan requestSpan2 = core.context().environment().requestTracer().requestSpan(TracingIdentifiers.SPAN_GET_ALL_REPLICAS, requestSpan);
        requestSpan2.attribute(TracingIdentifiers.ATTR_SYSTEM, "couchbase");
        return getAllReplicasRequests(core, collectionIdentifier, str, map, retryStrategy, duration, requestSpan2).thenApply(stream -> {
            return (List) stream.map(getRequest -> {
                return get(core, getRequest).thenApply(getResponse -> {
                    return new GetReplicaResponse(getResponse, getRequest instanceof ReplicaGetRequest);
                }).thenApply((Function<? super U, ? extends U>) function);
            }).collect(Collectors.toList());
        }).whenComplete((BiConsumer<? super U, ? super Throwable>) (list, th) -> {
            AtomicInteger atomicInteger = new AtomicInteger(list.size());
            Iterator it = list.iterator();
            while (it.hasNext()) {
                ((CompletableFuture) it.next()).whenComplete((obj, th) -> {
                    if (atomicInteger.decrementAndGet() == 0) {
                        requestSpan2.end();
                    }
                });
            }
        });
    }

    public static <R> CompletableFuture<R> getAnyReplicaAsync(Core core, CollectionIdentifier collectionIdentifier, String str, Duration duration, RetryStrategy retryStrategy, Map<String, Object> map, RequestSpan requestSpan, Function<GetReplicaResponse, R> function) {
        RequestSpan requestSpan2 = core.context().environment().requestTracer().requestSpan(TracingIdentifiers.SPAN_GET_ANY_REPLICA, requestSpan);
        CompletableFuture allReplicasAsync = getAllReplicasAsync(core, collectionIdentifier, str, duration, retryStrategy, map, requestSpan2, function);
        CompletableFuture completableFuture = new CompletableFuture();
        allReplicasAsync.whenComplete((list, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
            }
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            List synchronizedList = Collections.synchronizedList(new ArrayList());
            list.forEach(completableFuture2 -> {
                completableFuture2.whenComplete((obj, th) -> {
                    int incrementAndGet = atomicInteger.incrementAndGet();
                    if (th != null && (th instanceof CompletionException) && (th.getCause() instanceof CouchbaseException)) {
                        synchronizedList.add(((CouchbaseException) th.getCause()).context());
                    }
                    if (obj != null && atomicBoolean.compareAndSet(false, true)) {
                        completableFuture.complete(obj);
                    }
                    if (atomicBoolean.get() || incrementAndGet != list.size()) {
                        return;
                    }
                    completableFuture.completeExceptionally(new DocumentUnretrievableException(new AggregateErrorContext(synchronizedList)));
                });
            });
        });
        return completableFuture.whenComplete((obj, th2) -> {
            requestSpan2.end();
        });
    }

    public static CompletableFuture<Stream<GetRequest>> getAllReplicasRequests(Core core, CollectionIdentifier collectionIdentifier, String str, Map<String, Object> map, RetryStrategy retryStrategy, Duration duration, RequestSpan requestSpan) {
        Validators.notNullOrEmpty(str, "Id");
        CoreContext context = core.context();
        CoreEnvironment environment = context.environment();
        BucketConfig bucketConfig = core.clusterConfig().bucketConfig(collectionIdentifier.bucket());
        if (!(bucketConfig instanceof CouchbaseBucketConfig)) {
            if (bucketConfig != null) {
                CompletableFuture<Stream<GetRequest>> completableFuture = new CompletableFuture<>();
                completableFuture.completeExceptionally(CommonExceptions.getFromReplicaNotCouchbaseBucket());
                return completableFuture;
            }
            Duration ofMillis = Duration.ofMillis(100L);
            CompletableFuture<Stream<GetRequest>> completableFuture2 = new CompletableFuture<>();
            context.environment().timer().schedule(() -> {
                getAllReplicasRequests(core, collectionIdentifier, str, map, retryStrategy, duration.minus(ofMillis), requestSpan).whenComplete((stream, th) -> {
                    if (th != null) {
                        completableFuture2.completeExceptionally(th);
                    } else {
                        completableFuture2.complete(stream);
                    }
                });
            }, ofMillis);
            return completableFuture2;
        }
        int numberOfReplicas = ((CouchbaseBucketConfig) bucketConfig).numberOfReplicas();
        ArrayList arrayList = new ArrayList(numberOfReplicas + 1);
        GetRequest getRequest = new GetRequest(str, duration, context, collectionIdentifier, retryStrategy, environment.requestTracer().requestSpan(TracingIdentifiers.SPAN_REQUEST_KV_GET, requestSpan));
        getRequest.context().clientContext(map);
        arrayList.add(getRequest);
        short s = 1;
        while (true) {
            short s2 = s;
            if (s2 > numberOfReplicas) {
                return CompletableFuture.completedFuture(arrayList.stream());
            }
            ReplicaGetRequest replicaGetRequest = new ReplicaGetRequest(str, duration, context, collectionIdentifier, retryStrategy, s2, environment.requestTracer().requestSpan(TracingIdentifiers.SPAN_REQUEST_KV_GET_REPLICA, requestSpan));
            replicaGetRequest.context().clientContext(map);
            arrayList.add(replicaGetRequest);
            s = (short) (s2 + 1);
        }
    }

    private static CompletableFuture<GetResponse> get(Core core, GetRequest getRequest) {
        core.send(getRequest);
        return getRequest.response().thenApply((Function<? super R, ? extends U>) getResponse -> {
            if (getResponse.status().success()) {
                return getResponse;
            }
            throw DefaultErrorUtil.keyValueStatusToException(getRequest, getResponse);
        }).whenComplete((getResponse2, th) -> {
            getRequest.context().logicallyComplete(th);
        });
    }
}
