package net.openhft.chronicle.engine.server.internal;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.SubscriptionCollection;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.server.internal.ObjectKVSubscriptionHandler;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.WireOutPublisher;
import net.openhft.chronicle.wire.ParameterizeWireKey;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.Wires;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.v1.XmlPullParser;

/* loaded from: input_file:net/openhft/chronicle/engine/server/internal/SubscriptionHandler.class */
public class SubscriptionHandler<T extends SubscriptionCollection> extends AbstractHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHandler.class);
    final StringBuilder eventName = new StringBuilder();
    final Map<Long, Object> tidToListener = new ConcurrentHashMap();
    Wire outWire;
    T subscription;
    WireOutPublisher publisher;
    Asset asset;

    /* loaded from: input_file:net/openhft/chronicle/engine/server/internal/SubscriptionHandler$LocalSubscriber.class */
    class LocalSubscriber implements Subscriber<Object> {
        private final Long tid;
        private final WireOutPublisher publisher;
        volatile boolean subscriptionEnded;

        LocalSubscriber(Long l, WireOutPublisher wireOutPublisher) {
            this.tid = l;
            this.publisher = wireOutPublisher;
        }

        @Override // net.openhft.chronicle.engine.api.pubsub.Subscriber
        public void onMessage(Object obj) throws InvalidSubscriberException {
            if (this.subscriptionEnded) {
                return;
            }
            WriteMarshallable writeMarshallable = wireOut -> {
                wireOut.writeDocument(true, wireOut -> {
                    wireOut.writeEventName(CoreFields.tid).int64(this.tid.longValue());
                });
                wireOut.writeNotCompleteDocument(false, wireOut2 -> {
                    wireOut2.write(CoreFields.reply).object(obj);
                });
            };
            Object key = obj instanceof MapEvent ? ((MapEvent) obj).getKey() : obj;
            synchronized (this.publisher) {
                this.publisher.put(key, writeMarshallable);
            }
        }

        @Override // net.openhft.chronicle.engine.api.pubsub.ISubscriber
        public void onEndOfSubscription() {
            this.subscriptionEnded = true;
            synchronized (this.publisher) {
                if (!this.publisher.isClosed()) {
                    this.publisher.put(null, wireOut -> {
                        wireOut.writeDocument(true, wireOut -> {
                            wireOut.writeEventName(CoreFields.tid).int64(this.tid.longValue());
                        });
                        wireOut.writeDocument(false, wireOut2 -> {
                            wireOut2.writeEventName(ObjectKVSubscriptionHandler.EventId.onEndOfSubscription).text(XmlPullParser.NO_NAMESPACE);
                        });
                    });
                }
            }
        }

        @NotNull
        public String toString() {
            return "LocalSubscriber{tid=" + this.tid + '}';
        }
    }

    /* loaded from: input_file:net/openhft/chronicle/engine/server/internal/SubscriptionHandler$SubscriptionEventID.class */
    public enum SubscriptionEventID implements ParameterizeWireKey {
        registerSubscriber(new WireKey[0]),
        unregisterSubscriber(new WireKey[0]),
        keySubscriberCount(new WireKey[0]),
        entrySubscriberCount(new WireKey[0]),
        topicSubscriberCount(new WireKey[0]);

        private final WireKey[] params;

        SubscriptionEventID(WireKey... wireKeyArr) {
            this.params = wireKeyArr;
        }

        @Override // net.openhft.chronicle.wire.ParameterizeWireKey
        @NotNull
        public <P extends WireKey> P[] params() {
            return (P[]) this.params;
        }

        @Override // java.lang.Enum, net.openhft.chronicle.wire.WireKey
        public /* bridge */ /* synthetic */ CharSequence name() {
            return super.name();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean after(@NotNull StringBuilder sb) {
        if (SubscriptionEventID.topicSubscriberCount.contentEquals(sb)) {
            this.outWire.writeEventName(CoreFields.reply).int8(this.subscription.topicSubscriberCount());
            return true;
        }
        if (SubscriptionEventID.keySubscriberCount.contentEquals(sb)) {
            this.outWire.writeEventName(CoreFields.reply).int8(this.subscription.keySubscriberCount());
            return true;
        }
        if (!SubscriptionEventID.entrySubscriberCount.contentEquals(sb)) {
            return false;
        }
        this.outWire.writeEventName(CoreFields.reply).int8(this.subscription.entrySubscriberCount());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean before(Long l, @NotNull ValueIn valueIn) throws AssetNotFoundException {
        if (!SubscriptionEventID.registerSubscriber.contentEquals(this.eventName)) {
            if (!SubscriptionEventID.unregisterSubscriber.contentEquals(this.eventName)) {
                return false;
            }
            skipValue(valueIn);
            Subscriber<Object> subscriber = (Subscriber) this.tidToListener.remove(l);
            if (subscriber == null) {
                Jvm.debug().on(getClass(), "No subscriber to present to unregisterSubscriber (" + l + ")");
                return true;
            }
            this.asset.unregisterSubscriber(this.requestContext, subscriber);
            return true;
        }
        Class typeLiteral = valueIn.typeLiteral();
        StringBuilder acquireStringBuilder = Wires.acquireStringBuilder();
        Filter empty = "filter".contentEquals(acquireStringBuilder) ? (Filter) valueIn.wireIn().readEventName(acquireStringBuilder).object(Filter.class) : Filter.empty();
        if (this.tidToListener.containsKey(l)) {
            LOG.info("Duplicate registration for tid " + l);
            return true;
        }
        LocalSubscriber localSubscriber = new LocalSubscriber(l, this.requestContext.throttlePeriodMs() == 0 ? this.publisher : WireOutPublisher.newThrottledWireOutPublisher(this.requestContext.throttlePeriodMs(), this.publisher));
        this.tidToListener.put(l, localSubscriber);
        RequestContext elementType = this.requestContext.m276clone().elementType(typeLiteral);
        this.asset.acquireSubscription(elementType).registerSubscriber(elementType, localSubscriber, empty);
        return true;
    }

    @Override // net.openhft.chronicle.engine.server.internal.AbstractHandler
    protected void unregisterAll() {
        this.tidToListener.forEach((l, obj) -> {
            this.asset.unregisterSubscriber(this.requestContext, (Subscriber) obj);
        });
        this.tidToListener.clear();
    }
}
