/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.cdc;

import java.util.EnumSet;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.internal.cdc.CdcEventImpl;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgnitePredicate;

public class WalRecordsConsumer<K, V> {
    public static final String EVTS_CNT = "EventsCount";
    public static final String LAST_EVT_TIME = "LastEventTime";
    private final IgniteLogger log;
    private final CdcConsumer consumer;
    private AtomicLongMetric evtsCnt;
    private AtomicLongMetric lastEvtTs;
    private static final EnumSet<GridCacheOperation> OPERATIONS_TYPES = EnumSet.of(GridCacheOperation.CREATE, GridCacheOperation.UPDATE, GridCacheOperation.DELETE, GridCacheOperation.TRANSFORM);
    private static final IgnitePredicate<? super DataEntry> OPERATIONS_FILTER = e -> {
        if (!(e instanceof UnwrappedDataEntry)) {
            throw new IllegalStateException("Unexpected data entry [type=" + e.getClass().getName() + ']');
        }
        if ((e.flags() & 2) != 0 || (e.flags() & 4) != 0) {
            return false;
        }
        return OPERATIONS_TYPES.contains((Object)e.op());
    };

    public WalRecordsConsumer(CdcConsumer consumer, IgniteLogger log) {
        this.consumer = consumer;
        this.log = log;
    }

    public boolean onRecords(final Iterator<DataRecord> recs) {
        Iterator<CdcEvent> evts = new Iterator<CdcEvent>(){
            private Iterator<CdcEvent> entries;

            @Override
            public boolean hasNext() {
                this.advance();
                return this.hasCurrent();
            }

            @Override
            public CdcEvent next() {
                this.advance();
                if (!this.hasCurrent()) {
                    throw new NoSuchElementException();
                }
                WalRecordsConsumer.this.evtsCnt.increment();
                WalRecordsConsumer.this.lastEvtTs.value(System.currentTimeMillis());
                return this.entries.next();
            }

            private void advance() {
                if (this.hasCurrent()) {
                    return;
                }
                while (recs.hasNext()) {
                    this.entries = F.iterator(((DataRecord)recs.next()).writeEntries().iterator(), this::transform, true, OPERATIONS_FILTER);
                    if (this.entries.hasNext()) break;
                    this.entries = null;
                }
            }

            private boolean hasCurrent() {
                return this.entries != null && this.entries.hasNext();
            }

            private CdcEvent transform(DataEntry e) {
                UnwrappedDataEntry ue = (UnwrappedDataEntry)((Object)e);
                return new CdcEventImpl(ue.unwrappedKey(), ue.unwrappedValue(), (e.flags() & 1) != 0, e.partitionId(), e.writeVersion(), e.cacheId());
            }
        };
        return this.consumer.onEvents(evts);
    }

    public void start(MetricRegistry cdcReg, MetricRegistry cdcConsumerReg) throws IgniteCheckedException {
        this.consumer.start(cdcConsumerReg);
        this.evtsCnt = cdcReg.longMetric(EVTS_CNT, "Count of events processed by the consumer");
        this.lastEvtTs = cdcReg.longMetric(LAST_EVT_TIME, "Time of the last event process");
        if (this.log.isDebugEnabled()) {
            this.log.debug("WalRecordsConsumer started [consumer=" + this.consumer.getClass() + ']');
        }
    }

    public void stop() {
        this.consumer.stop();
        if (this.log.isInfoEnabled()) {
            this.log.info("WalRecordsConsumer stopped [consumer=" + this.consumer.getClass() + ']');
        }
    }

    public CdcConsumer consumer() {
        return this.consumer;
    }

    public String toString() {
        return S.toString(WalRecordsConsumer.class, this);
    }
}

