/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.mongodb;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Processor;
import org.apache.camel.component.mongodb.MongoDbChangeStreamsThread;
import org.apache.camel.component.mongodb.MongoDbEndpoint;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
import org.bson.BsonDocument;

public class MongoDbChangeStreamsConsumer
extends DefaultConsumer {
    private final MongoDbEndpoint endpoint;
    private ExecutorService executor;
    private MongoDbChangeStreamsThread changeStreamsThread;

    public MongoDbChangeStreamsConsumer(MongoDbEndpoint endpoint, Processor processor) {
        super(endpoint, processor);
        this.endpoint = endpoint;
    }

    @Override
    protected void doStop() throws Exception {
        super.doStop();
        if (this.changeStreamsThread != null) {
            this.changeStreamsThread.stop();
        }
        if (this.executor != null) {
            this.endpoint.getCamelContext().getExecutorServiceManager().shutdown(this.executor);
            this.executor = null;
        }
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        String streamFilter = this.endpoint.getStreamFilter();
        List<BsonDocument> bsonFilter = null;
        if (ObjectHelper.isNotEmpty(streamFilter)) {
            bsonFilter = Collections.singletonList(BsonDocument.parse(streamFilter));
        }
        this.executor = this.endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, this.endpoint.getEndpointUri(), 1);
        this.changeStreamsThread = new MongoDbChangeStreamsThread(this.endpoint, this, bsonFilter);
        this.changeStreamsThread.init();
        this.executor.execute(this.changeStreamsThread);
    }
}

