/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.client.thin;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
import org.apache.ignite.cache.query.CacheEntryEventAdapter;
import org.apache.ignite.client.ClientDisconnectListener;
import org.apache.ignite.client.ClientException;
import org.apache.ignite.internal.binary.streams.BinaryByteBufferInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.client.thin.ClientBinaryMarshaller;
import org.apache.ignite.internal.client.thin.ClientChannel;
import org.apache.ignite.internal.client.thin.ClientError;
import org.apache.ignite.internal.client.thin.ClientNotificationType;
import org.apache.ignite.internal.client.thin.ClientOperation;
import org.apache.ignite.internal.client.thin.ClientUtils;
import org.apache.ignite.internal.client.thin.NotificationListener;
import org.apache.ignite.internal.client.thin.PayloadInputChannel;
import org.apache.ignite.internal.client.thin.PayloadOutputChannel;
import org.apache.ignite.internal.client.thin.ReliableChannel;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;

public class ClientCacheEntryListenerHandler<K, V>
implements NotificationListener,
AutoCloseable {
    private static final byte KEEP_BINARY_FLAG_MASK = 1;
    private final Cache<K, V> jCacheAdapter;
    private final ReliableChannel ch;
    private final boolean keepBinary;
    private final ClientUtils utils;
    private volatile CacheEntryUpdatedListener<K, V> locLsnr;
    private volatile ClientDisconnectListener disconnectLsnr;
    private volatile ClientChannel clientCh;
    private volatile Long rsrcId;

    ClientCacheEntryListenerHandler(Cache<K, V> jCacheAdapter, ReliableChannel ch, ClientBinaryMarshaller marsh, boolean keepBinary) {
        this.jCacheAdapter = jCacheAdapter;
        this.ch = ch;
        this.keepBinary = keepBinary;
        this.utils = new ClientUtils(marsh);
    }

    public synchronized void startListen(CacheEntryUpdatedListener<K, V> locLsnr, ClientDisconnectListener disconnectLsnr, Factory<? extends CacheEntryEventFilter<? super K, ? super V>> rmtFilterFactory, int pageSize, long timeInterval, boolean includeExpired) {
        assert (locLsnr != null);
        if (this.clientCh != null) {
            throw new IllegalStateException("Listener was already started");
        }
        this.locLsnr = locLsnr;
        this.disconnectLsnr = disconnectLsnr;
        Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
            BinaryOutputStream out = payloadCh.out();
            out.writeInt(ClientUtils.cacheId(this.jCacheAdapter.getName()));
            out.writeByte(this.keepBinary ? (byte)1 : (byte)0);
            out.writeInt(pageSize);
            out.writeLong(timeInterval);
            out.writeBoolean(includeExpired);
            if (rmtFilterFactory == null) {
                out.writeByte((byte)101);
            } else {
                this.utils.writeObject(out, rmtFilterFactory);
                out.writeByte((byte)1);
            }
        };
        Function<PayloadInputChannel, T2> qryReader = payloadCh -> {
            ClientChannel ch = payloadCh.clientChannel();
            Long rsrcId = payloadCh.in().readLong();
            ch.addNotificationListener(ClientNotificationType.CONTINUOUS_QUERY_EVENT, rsrcId, this);
            return new T2<ClientChannel, Long>(ch, rsrcId);
        };
        try {
            T2 params = this.ch.service(ClientOperation.QUERY_CONTINUOUS, qryWriter, qryReader);
            this.clientCh = (ClientChannel)params.get1();
            this.rsrcId = (Long)params.get2();
        }
        catch (ClientError e) {
            throw new ClientException(e);
        }
    }

    @Override
    public void acceptNotification(ByteBuffer payload, Exception err) {
        if (err == null && payload != null) {
            BinaryByteBufferInputStream in = BinaryByteBufferInputStream.create(payload);
            int cnt = in.readInt();
            ArrayList evts = new ArrayList(cnt);
            for (int i = 0; i < cnt; ++i) {
                Object key = this.utils.readObject(in, this.keepBinary);
                Object oldVal = this.utils.readObject(in, this.keepBinary);
                Object val = this.utils.readObject(in, this.keepBinary);
                byte evtTypeByte = in.readByte();
                EventType evtType = this.eventType(evtTypeByte);
                if (evtType == null) {
                    this.onChannelClosed(new ClientException("Unknown event type: " + evtTypeByte));
                }
                evts.add(new CacheEntryEventImpl<K, V>(this.jCacheAdapter, evtType, key, oldVal, val));
            }
            this.locLsnr.onUpdated(evts);
        }
    }

    @Override
    public void onChannelClosed(Exception reason) {
        ClientDisconnectListener lsnr = this.disconnectLsnr;
        if (lsnr != null) {
            lsnr.onDisconnected(reason);
        }
        U.closeQuiet(this);
    }

    @Override
    public synchronized void close() {
        ClientChannel clientCh = this.clientCh;
        if (clientCh != null && !clientCh.closed()) {
            clientCh.removeNotificationListener(ClientNotificationType.CONTINUOUS_QUERY_EVENT, this.rsrcId);
            clientCh.service(ClientOperation.RESOURCE_CLOSE, ch -> ch.out().writeLong(this.rsrcId), null);
        }
    }

    public ClientChannel clientChannel() {
        return this.clientCh;
    }

    private EventType eventType(byte evtTypeByte) {
        switch (evtTypeByte) {
            case 0: {
                return EventType.CREATED;
            }
            case 1: {
                return EventType.UPDATED;
            }
            case 2: {
                return EventType.REMOVED;
            }
            case 3: {
                return EventType.EXPIRED;
            }
        }
        return null;
    }

    private static class CacheEntryEventImpl<K, V>
    extends CacheEntryEventAdapter<K, V> {
        private static final long serialVersionUID = 0L;
        private final K key;
        private final V oldVal;
        private final V newVal;

        private CacheEntryEventImpl(Cache<K, V> src, EventType evtType, K key, V oldVal, V newVal) {
            super(src, evtType);
            this.key = key;
            this.oldVal = oldVal;
            this.newVal = newVal;
        }

        @Override
        public K getKey() {
            return this.key;
        }

        @Override
        protected V getNewValue() {
            return this.newVal;
        }

        @Override
        public V getOldValue() {
            return this.oldVal;
        }

        @Override
        public boolean isOldValueAvailable() {
            return this.oldVal != null;
        }
    }
}

