/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.ignite.events;

import java.util.Arrays;
import java.util.List;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.ignite.events.IgniteEventsEndpoint;
import org.apache.camel.support.DefaultConsumer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.events.Event;
import org.apache.ignite.lang.IgnitePredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IgniteEventsConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(IgniteEventsConsumer.class);
    private IgniteEventsEndpoint endpoint;
    private IgniteEvents events;
    private int[] eventTypes = new int[0];
    private IgnitePredicate<Event> predicate = new IgnitePredicate<Event>(){
        private static final long serialVersionUID = 6738594728074592726L;

        @Override
        public boolean apply(Event event) {
            Exchange exchange = IgniteEventsConsumer.this.createExchange(true);
            Message in = exchange.getIn();
            in.setBody(event);
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Processing Ignite Event: {}.", (Object)event);
                }
                AsyncCallback cb = IgniteEventsConsumer.this.defaultConsumerCallback(exchange, true);
                IgniteEventsConsumer.this.getAsyncProcessor().process(exchange, cb);
            }
            catch (Exception e) {
                LOG.error(String.format("Exception while processing Ignite Event: %s.", event), (Throwable)e);
            }
            return true;
        }
    };

    public IgniteEventsConsumer(IgniteEventsEndpoint endpoint, Processor processor, IgniteEvents events) {
        super(endpoint, processor);
        this.endpoint = endpoint;
        this.events = events;
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        List<Integer> ids = this.endpoint.getEventsAsIds();
        this.eventTypes = new int[ids.size()];
        int counter = 0;
        for (Integer i : ids) {
            this.eventTypes[counter++] = i;
        }
        this.events.localListen(this.predicate, this.eventTypes);
        LOG.info("Started local Ignite Events consumer for events: {}.", Arrays.asList(new int[][]{this.eventTypes}));
    }

    @Override
    protected void doStop() throws Exception {
        super.doStop();
        this.events.stopLocalListen(this.predicate, this.eventTypes);
        LOG.info("Stopped local Ignite Events consumer for events: {}.", Arrays.asList(new int[][]{this.eventTypes}));
    }
}

