package org.apache.camel.component.mongodb;

import com.mongodb.BasicDBObject;
import com.mongodb.CursorType;
import com.mongodb.DBCollection;
import com.mongodb.MongoCursorNotFoundException;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import java.util.concurrent.CountDownLatch;
import org.apache.camel.Exchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/mongodb/MongoDbTailingProcess.class */
public class MongoDbTailingProcess implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbTailingProcess.class);
    private static final String CAPPED_KEY = "capped";
    public volatile boolean keepRunning = true;
    public volatile boolean stopped;
    private volatile CountDownLatch stoppedLatch;
    private final MongoCollection<BasicDBObject> dbCol;
    private final MongoDbEndpoint endpoint;
    private final MongoDbTailableCursorConsumer consumer;
    private final long cursorRegenerationDelay;
    private final boolean cursorRegenerationDelayEnabled;
    private MongoCursor<BasicDBObject> cursor;
    private MongoDbTailTrackingManager tailTracking;

    public MongoDbTailingProcess(MongoDbEndpoint mongoDbEndpoint, MongoDbTailableCursorConsumer mongoDbTailableCursorConsumer, MongoDbTailTrackingManager mongoDbTailTrackingManager) {
        this.endpoint = mongoDbEndpoint;
        this.consumer = mongoDbTailableCursorConsumer;
        this.dbCol = mongoDbEndpoint.getMongoCollection();
        this.tailTracking = mongoDbTailTrackingManager;
        this.cursorRegenerationDelay = mongoDbEndpoint.getCursorRegenerationDelay();
        this.cursorRegenerationDelayEnabled = this.cursorRegenerationDelay != 0;
    }

    public MongoCursor<BasicDBObject> getCursor() {
        return this.cursor;
    }

    public void initializeProcess() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Starting MongoDB Tailable Cursor consumer, binding to collection: {}", "db: " + this.endpoint.getMongoDatabase() + ", col: " + this.endpoint.getCollection());
        }
        if (!isCollectionCapped().booleanValue()) {
            throw new CamelMongoDbException("Tailable cursors are only compatible with capped collections, and collection " + this.endpoint.getCollection() + " is not capped");
        }
        try {
            this.tailTracking.recoverFromStore();
            this.cursor = initializeCursor();
            if (this.cursor == null) {
                throw new CamelMongoDbException("Tailable cursor was not initialized, or cursor returned is dead on arrival");
            }
        } catch (Exception e) {
            throw new CamelMongoDbException("Exception occurred while initializing tailable cursor", e);
        }
    }

    private Boolean isCollectionCapped() {
        Boolean bool = this.endpoint.getMongoDatabase().runCommand(createCollStatsCommand()).getBoolean(CAPPED_KEY);
        return Boolean.valueOf(bool != null ? bool.booleanValue() : false);
    }

    private BasicDBObject createCollStatsCommand() {
        return new BasicDBObject("collStats", this.endpoint.getCollection());
    }

    @Override // java.lang.Runnable
    public void run() {
        this.stoppedLatch = new CountDownLatch(1);
        while (this.keepRunning) {
            doRun();
            if (this.keepRunning) {
                this.cursor.close();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Regenerating cursor with lastVal: {}, waiting {}ms first", this.tailTracking.lastVal, Long.valueOf(this.cursorRegenerationDelay));
                }
                if (this.cursorRegenerationDelayEnabled) {
                    try {
                        Thread.sleep(this.cursorRegenerationDelay);
                    } catch (InterruptedException e) {
                    }
                }
                this.cursor = initializeCursor();
            }
        }
        this.stopped = true;
        this.stoppedLatch.countDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() throws Exception {
        if (LOG.isInfoEnabled()) {
            LOG.info("Stopping MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + this.endpoint.getDatabase() + ", col: " + this.endpoint.getCollection());
        }
        this.keepRunning = false;
        if (this.cursor != null) {
            this.cursor.close();
        }
        awaitStopped();
        if (LOG.isInfoEnabled()) {
            LOG.info("Stopped MongoDB Tailable Cursor consumer, bound to collection: {}", "db: " + this.endpoint.getDatabase() + ", col: " + this.endpoint.getCollection());
        }
    }

    private void doRun() {
        int i = 0;
        int persistRecords = this.endpoint.getPersistRecords();
        boolean z = persistRecords > 0;
        while (this.cursor.hasNext() && this.keepRunning) {
            try {
                BasicDBObject next = this.cursor.next();
                Exchange createMongoDbExchange = this.endpoint.createMongoDbExchange(next);
                try {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Sending exchange: {}, ObjectId: {}", createMongoDbExchange, next.get(DBCollection.ID_FIELD_NAME));
                    }
                    this.consumer.getProcessor().process(createMongoDbExchange);
                } catch (Exception e) {
                }
                this.tailTracking.setLastVal(next);
                if (z) {
                    int i2 = i;
                    i++;
                    if (i2 % persistRecords == 0) {
                        this.tailTracking.persistToStore();
                    }
                }
            } catch (MongoCursorNotFoundException e2) {
                if (this.keepRunning) {
                    LOG.debug("Cursor not found exception from MongoDB, will regenerate cursor. This is normal behaviour with tailable cursors.", e2);
                }
            } catch (IllegalStateException e3) {
            }
        }
        this.tailTracking.persistToStore();
    }

    private MongoCursor<BasicDBObject> initializeCursor() {
        MongoCursor<BasicDBObject> it;
        Object obj = this.tailTracking.lastVal;
        if (obj == null) {
            it = this.dbCol.find().cursorType(CursorType.TailableAwait).iterator();
        } else {
            it = this.dbCol.find(this.endpoint.getTailTrackingStrategy().createQuery(obj, this.tailTracking.getIncreasingFieldName())).cursorType(CursorType.TailableAwait).iterator();
        }
        return it;
    }

    private void awaitStopped() throws InterruptedException {
        if (this.stopped) {
            return;
        }
        LOG.info("Going to wait for stopping");
        this.stoppedLatch.await();
    }
}
