package net.openhft.chronicle.engine.api.query;

import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.core.time.SystemTimeProvider;
import net.openhft.chronicle.engine.api.pubsub.ConsumingSubscriber;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.tree.ChronicleQueueView;
import net.openhft.chronicle.engine.tree.QueueView;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import net.openhft.chronicle.queue.impl.RollingChronicleQueue;
import net.openhft.chronicle.wire.DefaultValueIn;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.KeyedMarshallable;
import net.openhft.chronicle.wire.Marshallable;
import net.openhft.chronicle.wire.MessageHistory;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/openhft/chronicle/engine/api/query/VanillaIndexQueueView.class */
public class VanillaIndexQueueView<V extends Marshallable> implements IndexQueueView<ConsumingSubscriber<IndexedValue<V>>, V> {
    private static final Logger LOG;
    private static final Iterator EMPTY_ITERATOR;
    private final ChronicleQueue chronicleQueue;

    @Nullable
    private final TypeToString typeToString;

    @NotNull
    private final Asset asset;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<String, ConcurrentMap<Object, IndexedValue<V>>> multiMap = new ConcurrentHashMap();
    private final Map<Subscriber<IndexedValue<V>>, AtomicBoolean> activeSubscriptions = new ConcurrentHashMap();
    private final AtomicBoolean isClosed = new AtomicBoolean();
    private final Object lastIndexLock = new Object();
    private final ThreadLocal<IndexedValue<V>> indexedValue = ThreadLocal.withInitial(IndexedValue::new);
    private volatile long lastIndexRead = 0;
    private long lastSecond = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
    private long messagesReadPerSecond = 0;

    @NotNull
    private ConcurrentMap<Bytes, BytesStore> bytesToKey = new ConcurrentHashMap();

    public VanillaIndexQueueView(@NotNull RequestContext requestContext, @NotNull Asset asset, @NotNull QueueView<?, V> queueView) {
        this.asset = asset;
        EventLoop eventLoop = (EventLoop) asset.acquireView(EventLoop.class);
        this.chronicleQueue = ((ChronicleQueueView) queueView).chronicleQueue();
        ExcerptTailer createTailer = this.chronicleQueue.createTailer();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.typeToString = (TypeToString) asset.root().findView(TypeToString.class);
        eventLoop.addHandler(() -> {
            Class<? extends Marshallable> type;
            if (!atomicBoolean.get()) {
                RollingChronicleQueue rollingChronicleQueue = (RollingChronicleQueue) this.chronicleQueue;
                boolean moveToIndex = createTailer.moveToIndex(rollingChronicleQueue.rollCycle().toIndex(rollingChronicleQueue.cycle(), 0L));
                atomicBoolean.set(moveToIndex);
                if (!moveToIndex) {
                    return false;
                }
            }
            long seconds = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
            if (seconds >= this.lastSecond + 10) {
                this.lastSecond = seconds;
                LOG.info("messages read per second=" + (this.messagesReadPerSecond / 10));
                this.messagesReadPerSecond = 0L;
            }
            if (this.isClosed.get()) {
                throw new InvalidEventHandlerException();
            }
            DocumentContext readingDocument = createTailer.readingDocument();
            Throwable th = null;
            try {
                if (!readingDocument.isPresent()) {
                    return false;
                }
                long readPosition = readingDocument.wire().bytes().readPosition();
                while (true) {
                    try {
                        readingDocument.wire().consumePadding();
                        if (readingDocument.wire().bytes().readRemaining() == 0) {
                            if (readingDocument != null) {
                                if (0 != 0) {
                                    try {
                                        readingDocument.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    readingDocument.close();
                                }
                            }
                            return true;
                        }
                        StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
                        ValueIn read = readingDocument.wire().read(acquireStringBuilder);
                        if ("history".contentEquals(acquireStringBuilder)) {
                            read.marshallable(MessageHistory.get());
                            if (readingDocument != null) {
                                if (0 != 0) {
                                    try {
                                        readingDocument.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    readingDocument.close();
                                }
                            }
                            return true;
                        }
                        if (acquireStringBuilder.length() != 0 && (type = this.typeToString.toType(acquireStringBuilder)) != null) {
                            Marshallable apply = VanillaObjectCacheFactory.INSTANCE.get().apply(type);
                            long readPosition2 = readingDocument.wire().bytes().readPosition();
                            try {
                                read.marshallable(apply);
                                if (apply instanceof KeyedMarshallable) {
                                    Bytes acquireBytes = Wires.acquireBytes();
                                    ((KeyedMarshallable) apply).writeKey(acquireBytes);
                                    BytesStore computeIfAbsent = this.bytesToKey.computeIfAbsent(acquireBytes, (v0) -> {
                                        return v0.copy();
                                    });
                                    if (computeIfAbsent != null) {
                                        this.messagesReadPerSecond++;
                                        String sb = acquireStringBuilder.toString();
                                        synchronized (this.lastIndexLock) {
                                            this.multiMap.computeIfAbsent(sb, str -> {
                                                return new ConcurrentHashMap();
                                            }).compute(computeIfAbsent, (obj, indexedValue) -> {
                                                if (indexedValue == null) {
                                                    return new IndexedValue(Wires.deepCopy(apply), readingDocument.index());
                                                }
                                                Wires.copyTo(apply, indexedValue.v());
                                                indexedValue.index(readingDocument.index());
                                                return indexedValue;
                                            });
                                            this.lastIndexRead = readingDocument.index();
                                        }
                                    }
                                } else {
                                    continue;
                                }
                            } catch (Exception e) {
                                LOG.error("Error passing " + apply.getClass().getSimpleName() + " bytes:\n" + readingDocument.wire().bytes().toHexString(readPosition2, readingDocument.wire().bytes().readLimit() - readPosition2), e);
                                if (readingDocument != null) {
                                    if (0 != 0) {
                                        try {
                                            readingDocument.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        readingDocument.close();
                                    }
                                }
                                return false;
                            }
                        }
                    } catch (RuntimeException e2) {
                        Jvm.warn().on(getClass(), Wires.fromSizePrefixedBlobs(readingDocument.wire().bytes(), readPosition - 4), e2);
                        if (readingDocument == null) {
                            return true;
                        }
                        if (0 == 0) {
                            readingDocument.close();
                            return true;
                        }
                        try {
                            readingDocument.close();
                            return true;
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                            return true;
                        }
                    }
                }
            } finally {
                if (readingDocument != null) {
                    if (0 != 0) {
                        try {
                            readingDocument.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        readingDocument.close();
                    }
                }
            }
        });
    }

    @Override // net.openhft.chronicle.engine.api.query.IndexQueueView
    public void registerSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> consumingSubscriber, @NotNull IndexQuery<V> indexQuery) {
        ExcerptTailer createTailer = this.chronicleQueue.createTailer();
        long index = createTailer.toStart().index();
        long index2 = createTailer.toEnd().index();
        long fromIndex = indexQuery.fromIndex();
        if (fromIndex == -1) {
            RollCycle rollCycle = ((RollingChronicleQueue) this.chronicleQueue).rollCycle();
            fromIndex = rollCycle.toIndex(rollCycle.toCycle(rollCycle.current(SystemTimeProvider.INSTANCE, 0L)), 0L);
        } else if (fromIndex == 0) {
            fromIndex = index2;
        }
        long max = Math.max(Math.min(fromIndex, index2), index);
        boolean moveToIndex = createTailer.moveToIndex(max);
        if (!$assertionsDisabled && !moveToIndex && max != index2) {
            throw new AssertionError("fromIndex=" + Long.toHexString(max) + ", start=" + Long.toHexString(index) + ",end=" + Long.toHexString(index2));
        }
        if (max <= index2) {
            registerSubscriber(consumingSubscriber, indexQuery, createTailer, max);
        } else {
            ensureAllDataIsLoadedBeforeRegistingSubsribe(consumingSubscriber, indexQuery, createTailer, index2, max);
        }
    }

    private void ensureAllDataIsLoadedBeforeRegistingSubsribe(@NotNull ConsumingSubscriber<IndexedValue<V>> consumingSubscriber, @NotNull IndexQuery<V> indexQuery, @NotNull ExcerptTailer excerptTailer, long j, long j2) {
        ((EventLoop) this.asset.root().getView(EventLoop.class)).addHandler(() -> {
            return endOfTailCheckedRegisterSubscriber(consumingSubscriber, indexQuery, excerptTailer, j, j2);
        });
    }

    private boolean endOfTailCheckedRegisterSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> consumingSubscriber, @NotNull IndexQuery<V> indexQuery, @NotNull ExcerptTailer excerptTailer, long j, long j2) throws InvalidEventHandlerException {
        if (this.lastIndexRead > j) {
            return false;
        }
        registerSubscriber(consumingSubscriber, indexQuery, excerptTailer, j2);
        throw new InvalidEventHandlerException();
    }

    private void registerSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> consumingSubscriber, @NotNull IndexQuery<V> indexQuery, @NotNull ExcerptTailer excerptTailer, long j) {
        this.activeSubscriptions.put(consumingSubscriber, new AtomicBoolean());
        String eventName = indexQuery.eventName();
        Predicate<V> filter = indexQuery.filter();
        try {
            consumingSubscriber.addSupplier(excerptConsumer(indexQuery, excerptTailer, indexQuery.bootstrap() ? this.multiMap.computeIfAbsent(eventName, str -> {
                return new ConcurrentHashMap();
            }).values().stream().filter(indexedValue -> {
                return indexedValue.index() < j && filter.test(indexedValue.v());
            }).iterator() : EMPTY_ITERATOR, j));
        } catch (RuntimeException e) {
            consumingSubscriber.onEndOfSubscription();
            Jvm.warn().on(getClass(), "Error registering subscription", e);
        }
    }

    @NotNull
    private Supplier<Marshallable> excerptConsumer(@NotNull IndexQuery<V> indexQuery, @NotNull ExcerptTailer excerptTailer, @NotNull Iterator<IndexedValue<V>> it, long j) {
        return () -> {
            return value(indexQuery, excerptTailer, it, j);
        };
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Nullable
    private Marshallable value(@NotNull IndexQuery<V> indexQuery, @NotNull ExcerptTailer excerptTailer, @NotNull Iterator<IndexedValue<V>> it, long j) {
        boolean isPresent;
        boolean hasMore;
        if (it.hasNext()) {
            IndexedValue<V> next = it.next();
            next.timePublished(System.currentTimeMillis());
            next.maxIndex(this.lastIndexRead);
            return next;
        }
        String eventName = indexQuery.eventName();
        Predicate<V> filter = indexQuery.filter();
        if (this.isClosed.get()) {
            throw Jvm.rethrow(new InvalidEventHandlerException("shutdown"));
        }
        DocumentContext readingDocument = excerptTailer.readingDocument();
        Throwable th = null;
        try {
            try {
                if (!readingDocument.isPresent()) {
                    if (isPresent) {
                        while (true) {
                            if (!hasMore) {
                                break;
                            }
                        }
                    }
                    return null;
                }
                if (LOG.isDebugEnabled()) {
                    Jvm.debug().on(getClass(), "processing the following message=" + Wires.fromSizePrefixedBlobs(readingDocument));
                }
                if (j > readingDocument.index()) {
                    if (readingDocument.isPresent()) {
                        while (readingDocument.wire().hasMore()) {
                            readingDocument.wire().read().skipValue();
                        }
                    }
                    if (readingDocument != null) {
                        if (0 != 0) {
                            try {
                                readingDocument.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            readingDocument.close();
                        }
                    }
                    return null;
                }
                Class<? extends Marshallable> type = this.typeToString.toType(eventName);
                if (type == null) {
                    if (readingDocument.isPresent()) {
                        while (readingDocument.wire().hasMore()) {
                            readingDocument.wire().read().skipValue();
                        }
                    }
                    if (readingDocument != null) {
                        if (0 != 0) {
                            try {
                                readingDocument.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            readingDocument.close();
                        }
                    }
                    return null;
                }
                ValueIn read = readingDocument.wire().read(eventName);
                if (read instanceof DefaultValueIn) {
                    if (readingDocument.isPresent()) {
                        while (readingDocument.wire().hasMore()) {
                            readingDocument.wire().read().skipValue();
                        }
                    }
                    if (readingDocument != null) {
                        if (0 != 0) {
                            try {
                                readingDocument.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            readingDocument.close();
                        }
                    }
                    return null;
                }
                Marshallable apply = VanillaObjectCacheFactory.INSTANCE.get().apply(type);
                read.marshallable(apply);
                if (!filter.test(apply)) {
                    if (readingDocument.isPresent()) {
                        while (readingDocument.wire().hasMore()) {
                            readingDocument.wire().read().skipValue();
                        }
                    }
                    if (readingDocument != null) {
                        if (0 != 0) {
                            try {
                                readingDocument.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            readingDocument.close();
                        }
                    }
                    return null;
                }
                IndexedValue<V> indexedValue = this.indexedValue.get();
                indexedValue.index(readingDocument.index());
                indexedValue.v(apply);
                indexedValue.timePublished(System.currentTimeMillis());
                indexedValue.maxIndex(Math.max(readingDocument.index(), this.lastIndexRead));
                if (readingDocument.isPresent()) {
                    while (readingDocument.wire().hasMore()) {
                        readingDocument.wire().read().skipValue();
                    }
                }
                if (readingDocument != null) {
                    if (0 != 0) {
                        try {
                            readingDocument.close();
                        } catch (Throwable th6) {
                            th.addSuppressed(th6);
                        }
                    } else {
                        readingDocument.close();
                    }
                }
                return indexedValue;
            } finally {
                if (readingDocument.isPresent()) {
                    while (readingDocument.wire().hasMore()) {
                        readingDocument.wire().read().skipValue();
                    }
                }
            }
        } finally {
            if (readingDocument != null) {
                if (0 != 0) {
                    try {
                        readingDocument.close();
                    } catch (Throwable th7) {
                        th.addSuppressed(th7);
                    }
                } else {
                    readingDocument.close();
                }
            }
        }
    }

    @Override // net.openhft.chronicle.engine.api.query.IndexQueueView
    public void unregisterSubscriber(@NotNull ConsumingSubscriber<IndexedValue<V>> consumingSubscriber) {
        AtomicBoolean remove = this.activeSubscriptions.remove(consumingSubscriber);
        if (remove != null) {
            remove.set(true);
        }
    }

    @Override // net.openhft.chronicle.core.io.Closeable, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.isClosed.set(true);
        this.activeSubscriptions.values().forEach(atomicBoolean -> {
            atomicBoolean.set(true);
        });
        this.chronicleQueue.close();
    }

    static {
        $assertionsDisabled = !VanillaIndexQueueView.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(VanillaIndexQueueView.class);
        EMPTY_ITERATOR = Collections.EMPTY_LIST.iterator();
    }
}
