/*
 * Decompiled with CFR 0.152.
 */
package krati.retention;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import krati.core.StoreConfig;
import krati.core.segment.SegmentFactory;
import krati.core.segment.WriteBufferSegmentFactory;
import krati.retention.Event;
import krati.retention.EventBatch;
import krati.retention.EventBatchCursor;
import krati.retention.EventBatchHeader;
import krati.retention.EventBatchSerializer;
import krati.retention.Position;
import krati.retention.Retention;
import krati.retention.RetentionConfig;
import krati.retention.RetentionFlushListener;
import krati.retention.SimpleEventBatch;
import krati.retention.SimpleEventBatchCursor;
import krati.retention.SimpleEventBatchSerializer;
import krati.retention.SimplePosition;
import krati.retention.clock.Clock;
import krati.retention.clock.Occurred;
import krati.retention.policy.RetentionPolicy;
import krati.store.BytesDB;
import krati.util.DaemonThreadFactory;
import org.apache.log4j.Logger;

public class SimpleRetention<T>
implements Retention<T> {
    private static final Logger _logger = Logger.getLogger(SimpleRetention.class);
    private final int _id;
    private final File _homeDir;
    private final BytesDB _store;
    private final int _eventBatchSize;
    private final EventBatchSerializer<T> _eventBatchSerializer;
    private final ConcurrentLinkedQueue<EventBatchCursor> _retentionQueue = new ConcurrentLinkedQueue();
    private final RetentionPolicy _retentionPolicy;
    private final RetentionPolicyApply _retentionPolicyApply = new RetentionPolicyApply();
    private final ScheduledExecutorService _retentionPolicyExecutor = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory());
    private volatile EventBatch<T> _batch = null;
    private volatile EventBatch<T> _lastBatch = null;
    private volatile EventBatchCursor _lastBatchCursor = null;
    private final Lock _batchLock = new ReentrantLock();
    private RetentionFlushListener _flushListener = null;

    public SimpleRetention(RetentionConfig<T> config) throws Exception {
        this(config.getId(), new File(config.getHomeDir(), "retention"), config.getRetentionInitialSize(), config.getRetentionPolicy(), new SimpleEventBatchSerializer<T>(config.getEventValueSerializer(), config.getEventClockSerializer()), config.getBatchSize(), config.getNumSyncBatchs(), config.getRetentionSegmentFactory(), config.getRetentionSegmentFileSizeMB());
    }

    public SimpleRetention(int id, File homeDir, RetentionPolicy retentionPolicy, EventBatchSerializer<T> batchSerializer, int batchSize) throws Exception {
        this(id, homeDir, 100000, retentionPolicy, batchSerializer, batchSize, new WriteBufferSegmentFactory(), 32);
    }

    public SimpleRetention(int id, File homeDir, int initialSize, RetentionPolicy retentionPolicy, EventBatchSerializer<T> batchSerializer, int batchSize, SegmentFactory segmentFactory, int segmentFileSizeMB) throws Exception {
        this(id, homeDir, initialSize, retentionPolicy, batchSerializer, batchSize, 10, segmentFactory, segmentFileSizeMB);
    }

    protected SimpleRetention(int id, File homeDir, int initialSize, RetentionPolicy retentionPolicy, EventBatchSerializer<T> batchSerializer, int batchSize, int numSyncBatches, SegmentFactory segmentFactory, int segmentFileSizeMB) throws Exception {
        this._id = id;
        this._homeDir = homeDir;
        this._retentionPolicy = retentionPolicy;
        this._eventBatchSerializer = batchSerializer;
        this._eventBatchSize = Math.max(100, batchSize);
        StoreConfig config = new StoreConfig(homeDir, initialSize);
        config.setBatchSize(1);
        config.setNumSyncBatches(numSyncBatches);
        config.setSegmentFileSizeMB(segmentFileSizeMB);
        config.setSegmentFactory(segmentFactory);
        this._store = new BytesDB(config);
        this.init();
    }

    protected void init() throws IOException {
        this._store.sync();
        int length = this._store.capacity();
        ArrayList<SimpleEventBatchCursor> list = new ArrayList<SimpleEventBatchCursor>(length / 2);
        for (int index = 0; index < length; ++index) {
            if (!this._store.hasData(index)) continue;
            try {
                byte[] bytes = this._store.get(index);
                EventBatchHeader header = this._eventBatchSerializer.deserializeHeader(bytes);
                SimpleEventBatchCursor cursor = new SimpleEventBatchCursor(index, header);
                list.add(cursor);
                continue;
            }
            catch (Exception e) {
                _logger.error((Object)"Failed to open a cursor", (Throwable)e);
            }
        }
        Clock batchClock = Clock.ZERO;
        long batchOrigin = 0L;
        int cnt = list.size();
        if (cnt > 0) {
            Collections.sort(list, new Comparator<EventBatchCursor>(){

                @Override
                public int compare(EventBatchCursor c1, EventBatchCursor c2) {
                    return (int)(c1.getHeader().getOrigin() - c2.getHeader().getOrigin());
                }
            });
            for (int i = 0; i < cnt; ++i) {
                this._retentionQueue.add((EventBatchCursor)list.get(i));
            }
            EventBatchHeader header = ((EventBatchCursor)list.get(cnt - 1)).getHeader();
            batchOrigin = header.getOrigin() + (long)header.getSize();
            batchClock = header.getMaxClock();
        }
        this._batch = this.nextEventBatch(batchOrigin, batchClock);
        this._lastBatch = null;
        this._lastBatchCursor = null;
        this.scheduleRetentionPolicy();
        _logger.info((Object)("init " + cnt + " batches"));
        _logger.info((Object)("init position=" + this.getPosition()));
        _logger.info((Object)("init batch=" + this._batch));
    }

    protected EventBatch<T> nextEventBatch(long offset, Clock initClock) {
        SimpleEventBatch b = new SimpleEventBatch(offset, initClock, this._eventBatchSize);
        _logger.info((Object)("Created EventBatch: " + b.getOrigin()));
        return b;
    }

    protected void scheduleRetentionPolicy() {
        this._retentionPolicyExecutor.scheduleWithFixedDelay(this._retentionPolicyApply, 1L, 5L, TimeUnit.SECONDS);
    }

    public final File getHomeDir() {
        return this._homeDir;
    }

    public final int getEventBatchSize() {
        return this._eventBatchSize;
    }

    public final EventBatchSerializer<T> getEventBatchSerializer() {
        return this._eventBatchSerializer;
    }

    public final RetentionFlushListener getFlushListener() {
        return this._flushListener;
    }

    public final void setFlushListener(RetentionFlushListener l) {
        this._flushListener = l;
    }

    @Override
    public final int getId() {
        return this._id;
    }

    @Override
    public long getOrigin() {
        long batchOrigin = this._batch.getOrigin();
        EventBatchCursor cursor = this._retentionQueue.peek();
        long ret = cursor == null ? batchOrigin : cursor.getHeader().getOrigin();
        return ret;
    }

    @Override
    public long getOffset() {
        return this._batch.getOrigin() + (long)this._batch.getSize();
    }

    @Override
    public Clock getMinClock() {
        Clock batchMinClock = this._batch.getMinClock();
        EventBatchCursor cursor = this._retentionQueue.peek();
        return cursor == null ? batchMinClock : cursor.getHeader().getMinClock();
    }

    @Override
    public Clock getMaxClock() {
        return this._batch.getMaxClock();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Clock getClock(long offset) {
        Clock clock;
        EventBatch<T> b;
        if (offset < this.getOrigin()) {
            return null;
        }
        if (offset >= this.getOffset()) {
            return this.getMaxClock();
        }
        this._batchLock.lock();
        try {
            b = this._batch;
            clock = b.getClock(offset);
            if (clock != null) {
                Clock clock2 = clock;
                return clock2;
            }
            b = this._lastBatch;
            if (b != null && (clock = b.getClock(offset)) != null) {
                Clock clock3 = clock;
                return clock3;
            }
        }
        finally {
            this._batchLock.unlock();
        }
        int cnt = 0;
        for (EventBatchCursor c : this._retentionQueue) {
            EventBatchHeader h = c.getHeader();
            long start = h.getOrigin();
            if (start <= offset) {
                if (offset < start + (long)h.getSize()) {
                    byte[] dat = this._store.get(c.getLookup());
                    try {
                        b = (EventBatch<T>)this._eventBatchSerializer.deserialize(dat);
                        clock = b.getClock(offset);
                        if (clock != null) {
                            return clock;
                        }
                    }
                    catch (Exception e) {
                        _logger.warn((Object)e.getMessage());
                    }
                }
            } else if (cnt == 0) break;
            ++cnt;
        }
        return null;
    }

    @Override
    public final int getBatchSize() {
        return this._eventBatchSize;
    }

    @Override
    public final RetentionPolicy getRetentionPolicy() {
        return this._retentionPolicy;
    }

    @Override
    public final Position getPosition() {
        return new SimplePosition(this.getId(), this.getOffset(), this.getMaxClock());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Position getPosition(Clock sinceClock) {
        long sinceOffset;
        Occurred occ = sinceClock.compareTo(this.getMinClock());
        if (occ == Occurred.EQUICONCURRENTLY) {
            return new SimplePosition(this.getId(), this.getOrigin(), this.getMinClock());
        }
        if (occ == Occurred.BEFORE || occ == Occurred.CONCURRENTLY) {
            return null;
        }
        if (sinceClock.after(this.getMaxClock())) {
            return this.getPosition();
        }
        this._batchLock.lock();
        try {
            EventBatch<T> b1 = this._batch;
            sinceOffset = b1.getOffset(sinceClock);
            if (sinceOffset >= 0L) {
                SimplePosition simplePosition = new SimplePosition(this.getId(), sinceOffset, b1.getClock(sinceOffset));
                return simplePosition;
            }
            EventBatch<T> b2 = this._lastBatch;
            if (b2 != null) {
                if (b2.getMaxClock().before(sinceClock)) {
                    if (b1.getMinClock().compareTo(sinceClock) == Occurred.EQUICONCURRENTLY) {
                        SimplePosition simplePosition = new SimplePosition(this.getId(), b1.getOrigin(), b1.getMinClock());
                        return simplePosition;
                    }
                    sinceOffset = b2.getOrigin() + (long)b2.getSize();
                    SimplePosition simplePosition = new SimplePosition(this.getId(), sinceOffset, b2.getClock(sinceOffset));
                    return simplePosition;
                }
                sinceOffset = b2.getOffset(sinceClock);
                if (sinceOffset >= 0L) {
                    SimplePosition simplePosition = new SimplePosition(this.getId(), sinceOffset, b2.getClock(sinceOffset));
                    return simplePosition;
                }
            }
        }
        finally {
            this._batchLock.unlock();
        }
        int cnt = 0;
        for (EventBatchCursor c : this._retentionQueue) {
            EventBatchHeader h = c.getHeader();
            occ = h.getMinClock().compareTo(sinceClock);
            if (occ == Occurred.EQUICONCURRENTLY) {
                if (cnt == 0) break;
                return new SimplePosition(this.getId(), h.getOrigin(), h.getMinClock());
            }
            if (occ == Occurred.BEFORE) {
                if (!sinceClock.after(h.getMaxClock())) {
                    byte[] dat = this._store.get(c.getLookup());
                    try {
                        EventBatch b = (EventBatch)this._eventBatchSerializer.deserialize(dat);
                        sinceOffset = b.getOffset(sinceClock);
                        if (sinceOffset >= 0L) {
                            return new SimplePosition(this.getId(), sinceOffset, b.getClock(sinceOffset));
                        }
                    }
                    catch (Exception e) {
                        _logger.warn((Object)e.getMessage());
                    }
                }
            } else if (cnt == 0) break;
            ++cnt;
        }
        return null;
    }

    @Override
    public Position get(Position pos, List<Event<T>> list) {
        if (pos.getOffset() < this.getOrigin() || pos.isIndexed()) {
            return null;
        }
        EventBatch<T> b = this._batch;
        if (b.getOrigin() <= pos.getOffset()) {
            long newOffset = b.get(pos.getOffset(), list);
            Clock clock = pos.getOffset() < newOffset ? b.getClock(newOffset - 1L) : pos.getClock();
            return new SimplePosition(this.getId(), newOffset, clock);
        }
        b = this._lastBatch;
        if (b != null && b.getOrigin() <= pos.getOffset()) {
            long newOffset = b.get(pos.getOffset(), list);
            Clock clock = pos.getOffset() < newOffset ? b.getClock(newOffset - 1L) : pos.getClock();
            return new SimplePosition(this.getId(), newOffset, clock);
        }
        int cnt = 0;
        for (EventBatchCursor c : this._retentionQueue) {
            if (c.getHeader().getOrigin() <= pos.getOffset()) {
                byte[] dat = this._store.get(c.getLookup());
                try {
                    b = (EventBatch<T>)this._eventBatchSerializer.deserialize(dat);
                    long newOffset = b.get(pos.getOffset(), list);
                    if (pos.getOffset() < newOffset) {
                        Clock clock = b.getClock(newOffset - 1L);
                        return new SimplePosition(this.getId(), newOffset, clock);
                    }
                }
                catch (Exception e) {
                    _logger.warn((Object)("Ignored EventBatch: " + c.getHeader().getOrigin()));
                }
            } else if (cnt == 0) break;
            ++cnt;
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized boolean put(Event<T> event) throws Exception {
        if (this._batch.isFull()) {
            this._batch.setCompletionTime(System.currentTimeMillis());
            byte[] bytes = this._eventBatchSerializer.serialize(this._batch);
            if (this._flushListener != null) {
                this._flushListener.beforeFlush(this._batch);
            }
            int batchId = this._store.add(bytes, this.getOffset());
            if (this._flushListener != null) {
                this._flushListener.afterFlush(this._batch);
            }
            SimpleEventBatchCursor cursor = new SimpleEventBatchCursor(batchId, this._batch.getHeader());
            this._retentionQueue.offer(cursor);
            this._batchLock.lock();
            try {
                this._lastBatch = this._batch;
                this._lastBatchCursor = cursor;
                this._batch = this.nextEventBatch(this._batch.getOrigin() + (long)this._batch.getSize(), event.getClock());
            }
            finally {
                this._batchLock.unlock();
            }
        }
        return this._batch.put(event);
    }

    @Override
    public boolean isOpen() {
        return this._store.isOpen();
    }

    @Override
    public synchronized void open() throws IOException {
        if (!this._store.isOpen()) {
            this._store.open();
            this.scheduleRetentionPolicy();
        }
    }

    @Override
    public synchronized void close() throws IOException {
        if (this._store.isOpen()) {
            this._retentionPolicyExecutor.shutdown();
            this._store.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void flush() throws IOException {
        if (this.isOpen() && !this._batch.isEmpty()) {
            if (this.mergeEventsToLastBatch()) {
                return;
            }
            this._batch.setCompletionTime(System.currentTimeMillis());
            byte[] bytes = this._eventBatchSerializer.serialize(this._batch);
            if (this._flushListener != null) {
                this._flushListener.beforeFlush(this._batch);
            }
            int batchId = 0;
            try {
                batchId = this._store.add(bytes, this.getOffset());
            }
            catch (Exception e) {
                if (e instanceof IOException) {
                    throw (IOException)e;
                }
                throw new IOException(e);
            }
            if (this._flushListener != null) {
                this._flushListener.afterFlush(this._batch);
            }
            SimpleEventBatchCursor cursor = new SimpleEventBatchCursor(batchId, this._batch.getHeader());
            this._retentionQueue.offer(cursor);
            this._batchLock.lock();
            try {
                this._lastBatch = this._batch;
                this._lastBatchCursor = cursor;
                this._batch = this.nextEventBatch(this._batch.getOrigin() + (long)this._batch.getSize(), this._batch.getMaxClock());
            }
            finally {
                this._batchLock.unlock();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean mergeEventsToLastBatch() throws IOException {
        if (this._lastBatch != null && this._eventBatchSize >= this._lastBatch.getSize() + this._batch.getSize()) {
            this._batch.setCompletionTime(System.currentTimeMillis());
            if (this._flushListener != null) {
                this._flushListener.beforeFlush(this._batch);
            }
            Object copy = ((SimpleEventBatch)this._lastBatch).clone();
            try {
                Iterator<Event<T>> iter = this._batch.iterator();
                while (iter.hasNext()) {
                    ((SimpleEventBatch)copy).put(iter.next());
                }
                ((SimpleEventBatch)copy).setCompletionTime(this._batch.getCompletionTime());
                byte[] bytes = this._eventBatchSerializer.serialize(copy);
                this._store.set(this._lastBatchCursor.getLookup(), bytes, this.getOffset());
            }
            catch (Exception e) {
                _logger.info((Object)"events merge aborted", (Throwable)e);
                return false;
            }
            if (this._flushListener != null) {
                this._flushListener.afterFlush(this._batch);
            }
            this._batchLock.lock();
            try {
                this._lastBatch = copy;
                this._lastBatchCursor.setHeader(((SimpleEventBatch)copy).getHeader());
                _logger.info((Object)(this._batch.getSize() + " events merged to EventBatch " + this._lastBatchCursor.getLookup()));
                this._batch = this.nextEventBatch(this._batch.getOrigin() + (long)this._batch.getSize(), this._batch.getMaxClock());
            }
            finally {
                this._batchLock.unlock();
            }
            return true;
        }
        return false;
    }

    private class RetentionPolicyApply
    implements Runnable {
        private RetentionPolicyApply() {
        }

        @Override
        public void run() {
            Collection<EventBatchCursor> discard = SimpleRetention.this._retentionPolicy.apply(SimpleRetention.this._retentionQueue);
            if (discard != null && discard.size() > 0) {
                for (EventBatchCursor c : discard) {
                    int index = c.getLookup();
                    try {
                        block7: {
                            if (SimpleRetention.this._retentionPolicy.isCallback()) {
                                try {
                                    byte[] dat = SimpleRetention.this._store.get(index);
                                    EventBatch b = (EventBatch)SimpleRetention.this._eventBatchSerializer.deserialize(dat);
                                    SimpleRetention.this._retentionPolicy.applyCallbackOn(b);
                                }
                                catch (Exception e) {
                                    if (!SimpleRetention.this._store.isOpen()) break block7;
                                    _logger.error((Object)("Failed to apply callback on cursor: " + c.getHeader().getOrigin()), (Throwable)e);
                                }
                            }
                        }
                        SimpleRetention.this._store.set(index, null, SimpleRetention.this.getOffset());
                        _logger.info((Object)("Removed EventBatch: " + c.getHeader().getOrigin()));
                    }
                    catch (Exception e) {
                        if (!SimpleRetention.this._store.isOpen()) continue;
                        _logger.error((Object)("Failed to apply retention policy on cursor " + index), e.getCause());
                    }
                }
            }
        }
    }
}

