/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoException;
import org.apache.camel.Exchange;
import org.apache.camel.component.mongodb.CamelMongoDbException;
import org.apache.camel.component.mongodb.MongoDbEndpoint;
import org.apache.camel.component.mongodb.MongoDbTailTrackingManager;
import org.apache.camel.component.mongodb.MongoDbTailableCursorConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbTailingProcess
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
    private static final String CAPPED_KEY = "capped";
    public volatile boolean keepRunning = true;
    public volatile boolean stopped;
    private final DBCollection dbCol;
    private final MongoDbEndpoint endpoint;
    private final MongoDbTailableCursorConsumer consumer;
    private final long cursorRegenerationDelay;
    private final boolean cursorRegenerationDelayEnabled;
    private DBCursor cursor;
    private MongoDbTailTrackingManager tailTracking;

    public MongoDbTailingProcess(MongoDbEndpoint endpoint, MongoDbTailableCursorConsumer consumer, MongoDbTailTrackingManager tailTrack) {
        this.endpoint = endpoint;
        this.consumer = consumer;
        this.dbCol = endpoint.getDbCollection();
        this.tailTracking = tailTrack;
        this.cursorRegenerationDelay = endpoint.getCursorRegenerationDelay();
        this.cursorRegenerationDelayEnabled = this.cursorRegenerationDelay != 0L;
    }

    public DBCursor getCursor() {
        return this.cursor;
    }

    public void initializeProcess() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", (Object)("db: " + this.dbCol.getDB() + ", col: " + this.dbCol.getName()));
        }
        if (this.dbCol.getStats().getInt(CAPPED_KEY) != 1) {
            throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + this.dbCol.getName() + " is not capped");
        }
        try {
            this.tailTracking.recoverFromStore();
            this.cursor = this.initializeCursor();
        }
        catch (Exception e) {
            throw new CamelMongoDbException("Exception ocurred while initializing tailable cursor", e);
        }
        if (this.cursor == null) {
            throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
        }
    }

    @Override
    public void run() {
        while (this.keepRunning) {
            this.doRun();
            if (!this.keepRunning) continue;
            this.cursor.close();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", this.tailTracking.lastVal, (Object)this.cursorRegenerationDelay);
            }
            if (this.cursorRegenerationDelayEnabled) {
                try {
                    Thread.sleep(this.cursorRegenerationDelay);
                }
                catch (InterruptedException e) {
                    LOG.error("Thread was interrupted", (Throwable)e);
                }
            }
            this.cursor = this.initializeCursor();
        }
        this.stopped = true;
    }

    protected void stop() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", (Object)("db: " + this.dbCol.getDB() + ", col: " + this.dbCol.getName()));
        }
        this.keepRunning = false;
        if (this.cursor != null) {
            this.cursor.close();
        }
        while (!this.stopped) {
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", (Object)("db: " + this.dbCol.getDB() + ", col: " + this.dbCol.getName()));
        }
    }

    private void doRun() {
        try {
            while (this.cursor.hasNext() && this.cursor.getCursorId() != 0L && this.keepRunning) {
                DBObject dbObj = this.cursor.next();
                Exchange exchange = this.endpoint.createMongoDbExchange(dbObj);
                try {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Sending exchange: {}, ObjectId: {}", (Object)exchange, dbObj.get("_id"));
                    }
                    this.consumer.getProcessor().process(exchange);
                }
                catch (Exception exception) {
                    // empty catch block
                }
                this.tailTracking.setLastVal(dbObj);
            }
        }
        catch (MongoException.CursorNotFound e) {
            if (this.keepRunning) {
                LOG.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", (Throwable)e);
            }
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        this.tailTracking.persistToStore();
    }

    private DBCursor initializeCursor() {
        DBCursor answer;
        Object lastVal = this.tailTracking.lastVal;
        if (lastVal == null) {
            answer = this.dbCol.find().addOption(2).addOption(32);
        } else {
            BasicDBObject queryObj = new BasicDBObject(this.tailTracking.getIncreasingFieldName(), new BasicDBObject("$gt", lastVal));
            answer = this.dbCol.find(queryObj).addOption(2).addOption(32);
        }
        return answer;
    }
}

