package com.couchbase.client.core.cnc;

import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.deps.org.jctools.queues.MpscArrayQueue;
import com.couchbase.client.core.json.Mapper;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.CbThrowables;
import com.couchbase.client.core.util.NanoTimestamp;
import java.io.PrintStream;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/couchbase/client/core/cnc/DefaultEventBus.class */
public class DefaultEventBus implements EventBus {
    private static final int DEFAULT_QUEUE_CAPACITY = 16384;
    private static final Duration DEFAULT_IDLE_SLEEP_DURATION = Duration.ofMillis(100);
    private static final Duration DEFAULT_OVERFLOW_LOG_INTERVAL = Duration.ofSeconds(30);
    private final CopyOnWriteArraySet<Consumer<Event>> subscribers;
    private final Queue<Event> eventQueue;
    private final AtomicBoolean running;
    private final PrintStream errorLogging;
    private final String threadName;
    private final Duration idleSleepDuration;
    private final Duration overflowLogInterval;
    private final Scheduler scheduler;
    private volatile Thread runningThread;
    private volatile NanoTimestamp overflowLogTimestamp;
    private final Map<Class<? extends Event>, SampleEventAndCount> overflowInfo;

    /* loaded from: input_file:com/couchbase/client/core/cnc/DefaultEventBus$Builder.class */
    public static class Builder {
        private final Scheduler scheduler;
        private int queueCapacity = 16384;
        private Optional<PrintStream> errorLogging = Optional.of(System.err);
        private String threadName = "cb-events";
        private Duration idleSleepDuration = DefaultEventBus.DEFAULT_IDLE_SLEEP_DURATION;
        private Duration overflowLogInterval = DefaultEventBus.DEFAULT_OVERFLOW_LOG_INTERVAL;

        Builder(Scheduler scheduler) {
            this.scheduler = scheduler;
        }

        public Builder queueCapacity(int i) {
            this.queueCapacity = i;
            return this;
        }

        public Builder errorLogging(Optional<PrintStream> optional) {
            this.errorLogging = optional;
            return this;
        }

        public Builder threadName(String str) {
            this.threadName = str;
            return this;
        }

        public Builder idleSleepDuration(Duration duration) {
            this.idleSleepDuration = duration;
            return this;
        }

        public Builder overflowLogInterval(Duration duration) {
            this.overflowLogInterval = duration;
            return this;
        }

        public DefaultEventBus build() {
            return new DefaultEventBus(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/couchbase/client/core/cnc/DefaultEventBus$SampleEventAndCount.class */
    public static class SampleEventAndCount {
        private volatile Event event;
        private final AtomicLong count;

        private SampleEventAndCount(Event event) {
            this.count = new AtomicLong(1L);
            this.event = event;
        }

        public SampleEventAndCount updateAndIncrement(Event event) {
            this.event = event;
            this.count.incrementAndGet();
            return this;
        }

        public String toString() {
            return "{sampleEvent=" + this.event + ", totalDropCount=" + this.count + '}';
        }
    }

    public static Builder builder(Scheduler scheduler) {
        return new Builder(scheduler);
    }

    public static DefaultEventBus create(Scheduler scheduler) {
        return builder(scheduler).build();
    }

    private DefaultEventBus(Builder builder) {
        this.subscribers = new CopyOnWriteArraySet<>();
        this.running = new AtomicBoolean(false);
        this.overflowLogTimestamp = NanoTimestamp.never();
        this.overflowInfo = new ConcurrentHashMap();
        this.eventQueue = new MpscArrayQueue(builder.queueCapacity);
        this.scheduler = builder.scheduler;
        this.errorLogging = (PrintStream) builder.errorLogging.orElse(null);
        this.threadName = builder.threadName;
        this.idleSleepDuration = builder.idleSleepDuration;
        this.overflowLogInterval = builder.overflowLogInterval;
    }

    @Override // com.couchbase.client.core.cnc.EventBus
    public EventSubscription subscribe(Consumer<Event> consumer) {
        this.subscribers.add(consumer);
        return new EventSubscription(this, consumer);
    }

    @Override // com.couchbase.client.core.cnc.EventBus
    public void unsubscribe(EventSubscription eventSubscription) {
        this.subscribers.remove(eventSubscription.consumer());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.couchbase.client.core.cnc.EventBus
    public EventBus.PublishResult publish(Event event) {
        if (!isRunning()) {
            return EventBus.PublishResult.SHUTDOWN;
        }
        if (this.eventQueue.offer(event)) {
            return EventBus.PublishResult.SUCCESS;
        }
        if (this.errorLogging != null) {
            try {
                this.overflowInfo.compute(event.getClass(), (cls, sampleEventAndCount) -> {
                    return sampleEventAndCount == null ? new SampleEventAndCount(event) : sampleEventAndCount.updateAndIncrement(event);
                });
            } catch (Exception e) {
            }
        }
        return EventBus.PublishResult.OVERLOADED;
    }

    @Override // com.couchbase.client.core.cnc.EventBus
    public Mono<Void> start() {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(false, true)) {
                this.runningThread = new Thread(() -> {
                    long millis = this.idleSleepDuration.toMillis();
                    long j = 0;
                    while (true) {
                        if (!isRunning() && this.eventQueue.isEmpty()) {
                            return;
                        }
                        Event poll = this.eventQueue.poll();
                        while (poll != null) {
                            Iterator<Consumer<Event>> it = this.subscribers.iterator();
                            while (it.hasNext()) {
                                try {
                                    it.next().accept(poll);
                                } catch (Throwable th) {
                                    if (this.errorLogging != null) {
                                        this.errorLogging.println("Exception caught in EventBus Consumer: " + th);
                                        th.printStackTrace();
                                    }
                                }
                            }
                            poll = this.eventQueue.poll();
                            long j2 = j + 1;
                            j = j2;
                            if (j2 == 10000) {
                                maybePrintOverflow();
                                j = 0;
                            }
                        }
                        maybePrintOverflow();
                        try {
                            if (isRunning()) {
                                Thread.sleep(millis);
                            }
                        } catch (InterruptedException e) {
                        }
                    }
                });
                this.runningThread.setDaemon(true);
                this.runningThread.setName(this.threadName);
                this.runningThread.start();
            }
            return Mono.empty();
        });
    }

    private void maybePrintOverflow() {
        try {
            if (this.errorLogging != null && !this.overflowInfo.isEmpty() && this.overflowLogTimestamp.hasElapsed(this.overflowLogInterval)) {
                HashMap hashMap = new HashMap();
                Iterator<Map.Entry<Class<? extends Event>, SampleEventAndCount>> it = this.overflowInfo.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<Class<? extends Event>, SampleEventAndCount> next = it.next();
                    hashMap.put(next.getKey().getSimpleName(), CbCollections.mapOf("sampleEvent", next.getValue().event.toString(), "totalDropCount", next.getValue().count));
                    it.remove();
                }
                this.errorLogging.println("Some events could not be published because the queue was (likely temporarily) over capacity: " + Mapper.encodeAsString(hashMap));
                this.overflowInfo.clear();
                this.overflowLogTimestamp = NanoTimestamp.now();
            }
        } catch (Exception e) {
            this.errorLogging.println("Encountered an error while processing the overflow queue - this is a bug: " + CbThrowables.getStackTraceAsString(e));
            this.overflowInfo.clear();
        }
    }

    @Override // com.couchbase.client.core.cnc.EventBus
    public Mono<Void> stop(Duration duration) {
        return Mono.defer(() -> {
            if (this.running.compareAndSet(true, false)) {
                this.runningThread.interrupt();
            }
            this.overflowInfo.clear();
            return Mono.empty();
        }).then(Flux.interval(Duration.ofMillis(10L), this.scheduler).takeUntil(l -> {
            return !this.runningThread.isAlive();
        }).then()).timeout(duration, this.scheduler);
    }

    boolean isRunning() {
        return this.running.get();
    }

    boolean hasSubscribers() {
        return !this.subscribers.isEmpty();
    }
}
