package org.apache.ignite.internal.processors.cache.query.continuous;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.LongUnaryOperator;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.SystemProperty;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.class */
public class CacheContinuousQueryEventBuffer {
    public static final int DFLT_CONTINUOUS_QUERY_PENDING_BUFF_SIZE = 10000;
    public static final int DFLT_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE = 1000;

    @SystemProperty(value = "The max size of the buffer with pending continuous queries events", type = Long.class, defaults = "10000")
    public static final String IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE = "IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE";

    @SystemProperty(value = "Continuous queries batch buffer size", type = Long.class, defaults = "1000")
    public static final String IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE = "IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE";
    public static final int MAX_PENDING_BUFF_SIZE;
    private static final int BUF_SIZE;
    private static final Object RETRY;
    private final IgniteLogger log;
    private final LongUnaryOperator currPartCntr;
    private final AtomicReference<Batch> curBatch;
    private final ReentrantReadWriteLock backupQueueGuard;
    private final Deque<CacheContinuousQueryEntry> backupQ;
    private final ConcurrentSkipListMap<Long, CacheContinuousQueryEntry> pending;
    private final AtomicInteger pendingCurrSize;
    final GridAtomicLong maxReceivedBackupAckUpdCntr;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer$Batch.class */
    public class Batch {
        private long filtered;
        private final long startCntr;
        private final long endCntr;
        private int lastProc = -1;
        private CacheContinuousQueryEntry[] entries;
        static final /* synthetic */ boolean $assertionsDisabled;

        Batch(long j, long j2, CacheContinuousQueryEntry[] cacheContinuousQueryEntryArr) {
            if (!$assertionsDisabled && j < 0) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && j2 < 0) {
                throw new AssertionError();
            }
            this.startCntr = j;
            this.filtered = j2;
            this.entries = cacheContinuousQueryEntryArr;
            this.endCntr = (j + CacheContinuousQueryEventBuffer.BUF_SIZE) - 1;
        }

        synchronized Map<Long, CacheContinuousQueryEntry> flushCurrentEntries(BiFunction<Long, Long, CacheContinuousQueryEntry> biFunction) {
            if (this.entries == null || biFunction == null) {
                return Collections.emptyMap();
            }
            HashMap hashMap = new HashMap();
            long j = this.filtered;
            long j2 = this.startCntr;
            for (int i = 0; i < this.entries.length; i++) {
                CacheContinuousQueryEntry cacheContinuousQueryEntry = this.entries[i];
                CacheContinuousQueryEntry cacheContinuousQueryEntry2 = null;
                if (cacheContinuousQueryEntry == null) {
                    if (j != 0) {
                        cacheContinuousQueryEntry2 = biFunction.apply(Long.valueOf(j2 - 1), Long.valueOf(j - 1));
                        j = 0;
                    }
                } else if (cacheContinuousQueryEntry.isFiltered()) {
                    j++;
                } else {
                    cacheContinuousQueryEntry2 = new CacheContinuousQueryEntry(cacheContinuousQueryEntry.cacheId(), cacheContinuousQueryEntry.eventType(), cacheContinuousQueryEntry.key(), cacheContinuousQueryEntry.newValue(), cacheContinuousQueryEntry.oldValue(), cacheContinuousQueryEntry.isKeepBinary(), cacheContinuousQueryEntry.partition(), cacheContinuousQueryEntry.updateCounter(), cacheContinuousQueryEntry.topologyVersion(), cacheContinuousQueryEntry.flags());
                    cacheContinuousQueryEntry2.filteredCount(j);
                    j = 0;
                }
                if (cacheContinuousQueryEntry2 != null) {
                    hashMap.put(Long.valueOf(cacheContinuousQueryEntry2.updateCounter()), cacheContinuousQueryEntry2);
                }
                j2++;
            }
            if (j != 0) {
                CacheContinuousQueryEntry apply = biFunction.apply(Long.valueOf(j2 - 1), Long.valueOf(j - 1));
                hashMap.put(Long.valueOf(apply.updateCounter()), apply);
            }
            return hashMap;
        }

        @Nullable
        private Object processEntry0(@Nullable Object obj, long j, CacheContinuousQueryEntry cacheContinuousQueryEntry, boolean z) {
            CacheContinuousQueryEntry cacheContinuousQueryEntry2;
            int i = (int) (j - this.startCntr);
            synchronized (this) {
                if (this.entries == null) {
                    return CacheContinuousQueryEventBuffer.RETRY;
                }
                this.entries[i] = cacheContinuousQueryEntry.copyWithDataReset();
                int i2 = this.lastProc + 1;
                long j2 = CacheContinuousQueryEventBuffer.this.maxReceivedBackupAckUpdCntr.get();
                if (i2 == i) {
                    for (int i3 = i2; i3 < this.entries.length && (cacheContinuousQueryEntry2 = this.entries[i3]) != null; i3++) {
                        if (cacheContinuousQueryEntry2.isFiltered()) {
                            this.filtered++;
                        } else {
                            cacheContinuousQueryEntry2.filteredCount(this.filtered);
                            this.filtered = 0L;
                            obj = CacheContinuousQueryEventBuffer.this.addResult(obj, cacheContinuousQueryEntry2, z);
                            this.entries[i3] = null;
                        }
                        i = i3;
                    }
                    this.lastProc = i;
                    if (i == this.entries.length - 1) {
                        rollOver(this.startCntr + CacheContinuousQueryEventBuffer.BUF_SIZE, this.filtered);
                    }
                } else if (this.endCntr < j2) {
                    rollOver(j2 + 1, 0L);
                }
                return obj;
            }
        }

        private synchronized Object tryRollOver() {
            if (this.entries == null) {
                return CacheContinuousQueryEventBuffer.RETRY;
            }
            long j = CacheContinuousQueryEventBuffer.this.maxReceivedBackupAckUpdCntr.get();
            if (this.endCntr >= j) {
                return null;
            }
            rollOver(j + 1, 0L);
            return CacheContinuousQueryEventBuffer.RETRY;
        }

        private void rollOver(long j, long j2) {
            Arrays.fill(this.entries, (Object) null);
            Batch batch = new Batch(j, j2, this.entries);
            this.entries = null;
            boolean compareAndSet = CacheContinuousQueryEventBuffer.this.curBatch.compareAndSet(this, batch);
            if (!$assertionsDisabled && !compareAndSet) {
                throw new AssertionError();
            }
        }

        static {
            $assertionsDisabled = !CacheContinuousQueryEventBuffer.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CacheContinuousQueryEventBuffer(LongUnaryOperator longUnaryOperator, IgniteLogger igniteLogger) {
        this.curBatch = new AtomicReference<>();
        this.backupQueueGuard = new ReentrantReadWriteLock();
        this.backupQ = new ConcurrentLinkedDeque();
        this.pending = new ConcurrentSkipListMap<>();
        this.pendingCurrSize = new AtomicInteger();
        this.maxReceivedBackupAckUpdCntr = new GridAtomicLong(0L);
        this.currPartCntr = longUnaryOperator;
        this.log = igniteLogger;
    }

    CacheContinuousQueryEventBuffer(IgniteLogger igniteLogger) {
        this(j -> {
            return 0L;
        }, igniteLogger);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void cleanupOnAck(long j) {
        this.backupQueueGuard.writeLock().lock();
        try {
            if (this.maxReceivedBackupAckUpdCntr.setIfGreater(j)) {
                this.backupQ.removeIf(cacheContinuousQueryEntry -> {
                    return cacheContinuousQueryEntry.updateCounter() <= j;
                });
            }
        } finally {
            this.backupQueueGuard.writeLock().unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public Collection<CacheContinuousQueryEntry> flushOnExchange(BiFunction<Long, Long, CacheContinuousQueryEntry> biFunction) {
        CacheContinuousQueryEntry pollFirst;
        TreeMap treeMap = new TreeMap();
        int size = this.backupQ.size();
        for (int i = 0; i < size && (pollFirst = this.backupQ.pollFirst()) != null; i++) {
            treeMap.put(Long.valueOf(pollFirst.updateCounter()), pollFirst);
        }
        Batch batch = this.curBatch.get();
        if (batch != null) {
            treeMap.putAll(batch.flushCurrentEntries(biFunction));
        }
        for (CacheContinuousQueryEntry cacheContinuousQueryEntry : this.pending.values()) {
            treeMap.put(Long.valueOf(cacheContinuousQueryEntry.updateCounter()), cacheContinuousQueryEntry);
        }
        if (treeMap.isEmpty()) {
            return null;
        }
        return treeMap.values();
    }

    long currentFiltered() {
        Batch batch = this.curBatch.get();
        if (batch != null) {
            return batch.filtered;
        }
        return 0L;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nullable
    public Object processEntry(CacheContinuousQueryEntry cacheContinuousQueryEntry, boolean z) {
        return process0(cacheContinuousQueryEntry.updateCounter(), cacheContinuousQueryEntry, z);
    }

    /* JADX WARN: Code restructure failed: missing block: B:21:0x0083, code lost:
    
        r7.pendingCurrSize.incrementAndGet();
        r7.pending.put(java.lang.Long.valueOf(r8), r10);
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x00a2, code lost:
    
        if (r7.pendingCurrSize.get() <= org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE) goto L58;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x00a5, code lost:
    
        r0 = r7.pending;
     */
    /* JADX WARN: Code restructure failed: missing block: B:24:0x00ac, code lost:
    
        monitor-enter(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x00b7, code lost:
    
        if (r7.pendingCurrSize.get() > org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE) goto L40;
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x00bc, code lost:
    
        monitor-exit(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:31:0x00c0, code lost:
    
        org.apache.ignite.internal.util.typedef.internal.LT.warn(r7.log, "Buffer for pending events reached max of its size [cacheId=" + r10.cacheId() + ", maxSize=" + org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.MAX_PENDING_BUFF_SIZE + ", partId=" + r10.partition() + "]");
        r15 = org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.BUF_SIZE;
        r0 = r7.pending.entrySet().iterator();
     */
    /* JADX WARN: Code restructure failed: missing block: B:33:0x00f1, code lost:
    
        if (r0.hasNext() == false) goto L75;
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x00f6, code lost:
    
        if (r15 <= 0) goto L76;
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x00f9, code lost:
    
        r0 = r0.next().getValue();
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x010f, code lost:
    
        if (r11 != false) goto L78;
     */
    /* JADX WARN: Code restructure failed: missing block: B:38:0x0112, code lost:
    
        r13 = addResult(r13, r0.copyWithDataReset(), r11);
     */
    /* JADX WARN: Code restructure failed: missing block: B:40:0x0121, code lost:
    
        r0.remove();
        r7.pendingCurrSize.decrementAndGet();
        r15 = r15 - 1;
     */
    /* JADX WARN: Code restructure failed: missing block: B:44:0x0138, code lost:
    
        monitor-exit(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:69:0x0031, code lost:
    
        if (r11 == false) goto L16;
     */
    /* JADX WARN: Code restructure failed: missing block: B:70:0x0034, code lost:
    
        addToBackupQueue(r10);
     */
    /* JADX WARN: Code restructure failed: missing block: B:72:0x003b, code lost:
    
        if (r11 == false) goto L19;
     */
    /* JADX WARN: Code restructure failed: missing block: B:73:0x003e, code lost:
    
        return null;
     */
    /* JADX WARN: Code restructure failed: missing block: B:75:0x0043, code lost:
    
        return r10;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private java.lang.Object process0(long r8, org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry r10, boolean r11) {
        /*
            Method dump skipped, instructions count: 380
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEventBuffer.process0(long, org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry, boolean):java.lang.Object");
    }

    private Batch initBatch(boolean z) {
        while (this.curBatch.get() == null) {
            long applyAsLong = this.currPartCntr.applyAsLong(z ? 1L : 0L);
            if (applyAsLong == -1) {
                return null;
            }
            this.curBatch.compareAndSet(null, new Batch(applyAsLong + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE]));
        }
        return this.curBatch.get();
    }

    @Nullable
    private Object processPending(@Nullable Object obj, Batch batch, boolean z) {
        Object obj2;
        if (this.pending.floorKey(Long.valueOf(batch.endCntr)) == null) {
            return obj;
        }
        synchronized (this.pending) {
            for (Map.Entry entry : this.pending.headMap((ConcurrentSkipListMap<Long, CacheContinuousQueryEntry>) Long.valueOf(batch.endCntr), true).entrySet()) {
                long longValue = ((Long) entry.getKey()).longValue();
                if (!$assertionsDisabled && longValue > batch.endCntr) {
                    throw new AssertionError();
                }
                if (this.pending.remove(Long.valueOf(longValue)) != null) {
                    obj = longValue < batch.startCntr ? addResult(obj, (CacheContinuousQueryEntry) entry.getValue(), z) : batch.processEntry0(obj, ((Long) entry.getKey()).longValue(), (CacheContinuousQueryEntry) entry.getValue(), z);
                    this.pendingCurrSize.decrementAndGet();
                }
            }
            obj2 = obj;
        }
        return obj2;
    }

    @Nullable
    private Object addResult(@Nullable Object obj, CacheContinuousQueryEntry cacheContinuousQueryEntry, boolean z) {
        List list;
        if (obj == null) {
            if (z) {
                addToBackupQueue(cacheContinuousQueryEntry);
            } else {
                obj = cacheContinuousQueryEntry;
            }
        } else {
            if (!$assertionsDisabled && z) {
                throw new AssertionError();
            }
            if (obj instanceof CacheContinuousQueryEntry) {
                list = new ArrayList();
                list.add((CacheContinuousQueryEntry) obj);
            } else {
                if (!$assertionsDisabled && !(obj instanceof List)) {
                    throw new AssertionError(obj);
                }
                list = (List) obj;
            }
            list.add(cacheContinuousQueryEntry);
            obj = list;
        }
        return obj;
    }

    int backupQueueSize() {
        return this.backupQ.size();
    }

    CacheContinuousQueryEntry[] bufferedEntries() {
        Batch batch = this.curBatch.get();
        if (batch == null) {
            return null;
        }
        return batch.entries;
    }

    private void addToBackupQueue(CacheContinuousQueryEntry cacheContinuousQueryEntry) {
        this.backupQueueGuard.readLock().lock();
        try {
            if (cacheContinuousQueryEntry.updateCounter() > this.maxReceivedBackupAckUpdCntr.get()) {
                this.backupQ.add(cacheContinuousQueryEntry);
            }
        } finally {
            this.backupQueueGuard.readLock().unlock();
        }
    }

    static {
        $assertionsDisabled = !CacheContinuousQueryEventBuffer.class.desiredAssertionStatus();
        MAX_PENDING_BUFF_SIZE = IgniteSystemProperties.getInteger(IGNITE_CONTINUOUS_QUERY_PENDING_BUFF_SIZE, 10000);
        BUF_SIZE = IgniteSystemProperties.getInteger(IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE, 1000);
        RETRY = new Object();
    }
}
