/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.impl.NatsMessage;
import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Predicate;

class MessageQueue {
    private static final int STOPPED = 0;
    private static final int RUNNING = 1;
    private static final int DRAINING = 2;
    private final AtomicLong length;
    private final AtomicLong sizeInBytes;
    private final AtomicInteger running;
    private final boolean singleThreadedReader;
    private final ConcurrentLinkedQueue<NatsMessage> queue = new ConcurrentLinkedQueue();
    private final ConcurrentLinkedQueue<Thread> waiters;
    public static final int MAX_SPINS = 200;
    public static final int SPIN_WAIT = 50;
    public static final int MAX_SPIN_TIME = 10000;

    MessageQueue(boolean singleReaderMode) {
        this.running = new AtomicInteger(1);
        this.sizeInBytes = new AtomicLong(0L);
        this.length = new AtomicLong(0L);
        this.waiters = new ConcurrentLinkedQueue();
        this.singleThreadedReader = singleReaderMode;
    }

    boolean isSingleReaderMode() {
        return this.singleThreadedReader;
    }

    boolean isRunning() {
        return this.running.get() != 0;
    }

    boolean isDraining() {
        return this.running.get() == 2;
    }

    void pause() {
        this.running.set(0);
        this.signalAll();
    }

    void resume() {
        this.running.set(1);
        this.signalAll();
    }

    void drain() {
        this.running.set(2);
        this.signalAll();
    }

    boolean isDrained() {
        return this.running.get() == 2 && this.length() == 0L;
    }

    void signalOne() {
        Thread t = this.waiters.poll();
        if (t != null) {
            LockSupport.unpark(t);
        }
    }

    void signalIfNotEmpty() {
        if (this.length.get() > 0L) {
            this.signalOne();
        }
    }

    void signalAll() {
        Thread t = this.waiters.poll();
        while (t != null) {
            LockSupport.unpark(t);
            t = this.waiters.poll();
        }
    }

    void push(NatsMessage msg) {
        this.queue.add(msg);
        this.sizeInBytes.getAndAdd(msg.getSizeInBytes());
        this.length.incrementAndGet();
        this.signalOne();
    }

    NatsMessage waitForTimeout(Duration timeout) throws InterruptedException {
        long timeoutNanos = timeout != null ? timeout.toNanos() : -1L;
        NatsMessage retVal = null;
        if (timeoutNanos >= 0L) {
            Thread t = Thread.currentThread();
            long start = System.nanoTime();
            if (timeoutNanos > 10000L) {
                for (int count = 0; this.isRunning() && (retVal = this.queue.poll()) == null && count < 200 && !this.isDraining(); ++count) {
                    LockSupport.parkNanos(50L);
                }
            }
            if (retVal != null) {
                return retVal;
            }
            long now = start;
            while (this.isRunning() && (retVal = this.queue.poll()) == null && !this.isDraining() && (timeoutNanos <= 0L || (timeoutNanos -= (now = System.nanoTime()) - (start = now)) > 0L)) {
                this.waiters.add(t);
                if (timeoutNanos == 0L) {
                    LockSupport.park();
                } else {
                    LockSupport.parkNanos(timeoutNanos);
                }
                this.waiters.remove(t);
                if (!Thread.interrupted()) continue;
                throw new InterruptedException("Interrupted during timeout");
            }
        }
        return retVal;
    }

    NatsMessage pop(Duration timeout) throws InterruptedException {
        if (!this.isRunning()) {
            return null;
        }
        NatsMessage retVal = this.queue.poll();
        if (retVal == null && timeout != null) {
            retVal = this.waitForTimeout(timeout);
        }
        if (retVal != null) {
            this.sizeInBytes.getAndAdd(-retVal.getSizeInBytes());
            this.length.decrementAndGet();
            this.signalIfNotEmpty();
        }
        return retVal;
    }

    NatsMessage accumulate(long maxSize, long maxMessages, Duration timeout) throws InterruptedException {
        NatsMessage next;
        if (!this.singleThreadedReader) {
            throw new IllegalStateException("Accumulate is only supported in single reader mode.");
        }
        if (!this.isRunning()) {
            return null;
        }
        NatsMessage msg = this.queue.poll();
        if (msg == null) {
            msg = this.waitForTimeout(timeout);
            if (!this.isRunning() || msg == null) {
                return null;
            }
        }
        long size = msg.getSizeInBytes();
        if (maxMessages <= 1L || size >= maxSize) {
            this.sizeInBytes.addAndGet(-size);
            this.length.decrementAndGet();
            this.signalIfNotEmpty();
            return msg;
        }
        long count = 1L;
        NatsMessage cursor = msg;
        while (cursor != null && (next = this.queue.peek()) != null) {
            long s = next.getSizeInBytes();
            if (maxSize >= 0L && size + s >= maxSize) break;
            size += s;
            cursor = cursor.next = this.queue.poll();
            if (++count != maxMessages) continue;
            break;
        }
        this.sizeInBytes.addAndGet(-size);
        this.length.addAndGet(-count);
        this.signalIfNotEmpty();
        return msg;
    }

    NatsMessage popNow() throws InterruptedException {
        return this.pop(null);
    }

    long length() {
        return this.length.get();
    }

    long sizeInBytes() {
        return this.sizeInBytes.get();
    }

    void filter(Predicate<NatsMessage> p) {
        if (this.isRunning()) {
            throw new IllegalStateException("Filter is only supported when the queue is paused");
        }
        ConcurrentLinkedQueue<NatsMessage> newQueue = new ConcurrentLinkedQueue<NatsMessage>();
        NatsMessage cursor = this.queue.poll();
        while (cursor != null) {
            if (!p.test(cursor)) {
                newQueue.add(cursor);
            } else {
                this.sizeInBytes.addAndGet(-cursor.getSizeInBytes());
                this.length.decrementAndGet();
            }
            cursor = this.queue.poll();
        }
        this.queue.addAll(newQueue);
    }
}

