package org.apache.camel.component.ignite.cache;

import java.util.Iterator;
import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.ignite.IgniteConstants;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.EmptyAsyncCallback;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/ignite/cache/IgniteCacheContinuousQueryConsumer.class */
public class IgniteCacheContinuousQueryConsumer extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(IgniteCacheContinuousQueryConsumer.class);
    private IgniteCacheEndpoint endpoint;
    private IgniteCache<Object, Object> cache;
    private QueryCursor<Cache.Entry<Object, Object>> cursor;

    public IgniteCacheContinuousQueryConsumer(IgniteCacheEndpoint igniteCacheEndpoint, Processor processor) {
        super(igniteCacheEndpoint, processor);
        this.endpoint = igniteCacheEndpoint;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.cache = this.endpoint.obtainCache();
        launchContinuousQuery();
        LOG.info("Started Ignite Cache Continuous Query consumer for cache {} with query: {}.", this.cache.getName(), this.endpoint.getQuery());
        maybeFireExistingQueryResults();
    }

    private void maybeFireExistingQueryResults() {
        if (!this.endpoint.isFireExistingQueryResults()) {
            LOG.info(String.format("Skipping existing cache results for cache name = %s.", this.endpoint.getCacheName()));
            return;
        }
        LOG.info(String.format("Processing existing cache results for cache name = %s.", this.endpoint.getCacheName()));
        Iterator it = this.cursor.iterator();
        while (it.hasNext()) {
            Cache.Entry entry = (Cache.Entry) it.next();
            createExchange(entry.getValue()).getIn().setHeader(IgniteConstants.IGNITE_CACHE_KEY, entry.getKey());
            getAsyncProcessor().process(createExchange(entry), new AsyncCallback() { // from class: org.apache.camel.component.ignite.cache.IgniteCacheContinuousQueryConsumer.1
                public void done(boolean z) {
                }
            });
        }
    }

    private void launchContinuousQuery() {
        ContinuousQuery continuousQuery = new ContinuousQuery();
        if (this.endpoint.getQuery() != null) {
            continuousQuery.setInitialQuery(this.endpoint.getQuery());
        }
        if (this.endpoint.getRemoteFilter() != null) {
            continuousQuery.setRemoteFilter(this.endpoint.getRemoteFilter());
        }
        continuousQuery.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { // from class: org.apache.camel.component.ignite.cache.IgniteCacheContinuousQueryConsumer.2
            public void onUpdated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> iterable) throws CacheEntryListenerException {
                if (IgniteCacheContinuousQueryConsumer.LOG.isTraceEnabled()) {
                    IgniteCacheContinuousQueryConsumer.LOG.info("Processing Continuous Query event(s): {}.", iterable);
                }
                if (!IgniteCacheContinuousQueryConsumer.this.endpoint.isOneExchangePerUpdate()) {
                    IgniteCacheContinuousQueryConsumer.this.fireGroupedExchange(iterable);
                    return;
                }
                Iterator<CacheEntryEvent<? extends Object, ? extends Object>> it = iterable.iterator();
                while (it.hasNext()) {
                    IgniteCacheContinuousQueryConsumer.this.fireSingleExchange(it.next());
                }
            }
        });
        continuousQuery.setAutoUnsubscribe(this.endpoint.isAutoUnsubscribe());
        continuousQuery.setPageSize(this.endpoint.getPageSize());
        continuousQuery.setTimeInterval(this.endpoint.getTimeInterval());
        this.cursor = this.cache.query(continuousQuery);
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.cursor.close();
        LOG.info("Stopped Ignite Cache Continuous Query consumer for cache {} with query: {}.", this.cache.getName(), this.endpoint.getQuery());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fireSingleExchange(CacheEntryEvent<? extends Object, ? extends Object> cacheEntryEvent) {
        Exchange createExchange = createExchange(cacheEntryEvent.getValue());
        createExchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_EVENT_TYPE, cacheEntryEvent.getEventType());
        createExchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_OLD_VALUE, cacheEntryEvent.getOldValue());
        createExchange.getIn().setHeader(IgniteConstants.IGNITE_CACHE_KEY, cacheEntryEvent.getKey());
        getAsyncProcessor().process(createExchange, EmptyAsyncCallback.get());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fireGroupedExchange(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> iterable) {
        getAsyncProcessor().process(createExchange(iterable), EmptyAsyncCallback.get());
    }

    private Exchange createExchange(Object obj) {
        Exchange createExchange = this.endpoint.createExchange(ExchangePattern.InOnly);
        Message in = createExchange.getIn();
        in.setBody(obj);
        in.setHeader(IgniteConstants.IGNITE_CACHE_NAME, this.endpoint.getCacheName());
        return createExchange;
    }
}
