/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.ignite.messaging;

import java.util.UUID;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.ignite.messaging.IgniteMessagingEndpoint;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IgniteMessagingConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(IgniteMessagingConsumer.class);
    private IgniteMessagingEndpoint endpoint;
    private IgniteMessaging messaging;
    private IgniteBiPredicate<UUID, Object> predicate = new IgniteBiPredicate<UUID, Object>(){
        private static final long serialVersionUID = -971933058406324501L;

        @Override
        public boolean apply(UUID uuid, Object payload) {
            Exchange exchange = IgniteMessagingConsumer.this.endpoint.createExchange(ExchangePattern.InOnly);
            Message in = exchange.getIn();
            in.setBody(payload);
            in.setHeader("CamelIgniteMessagingTopic", (Object)IgniteMessagingConsumer.this.endpoint.getTopic());
            in.setHeader("CamelIgniteMessagingUUID", (Object)uuid);
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Processing Ignite message for subscription {} with payload {}.", (Object)uuid, payload);
                }
                IgniteMessagingConsumer.this.getProcessor().process(exchange);
            }
            catch (Exception e) {
                LOG.error(String.format("Exception while processing Ignite Message from topic %s", IgniteMessagingConsumer.this.endpoint.getTopic()), (Throwable)e);
            }
            return true;
        }
    };

    public IgniteMessagingConsumer(IgniteMessagingEndpoint endpoint, Processor processor, IgniteMessaging messaging) {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
        this.messaging = messaging;
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.messaging.localListen(this.endpoint.getTopic(), this.predicate);
        LOG.info("Started Ignite Messaging consumer for topic {}.", (Object)this.endpoint.getTopic());
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.messaging.stopLocalListen(this.endpoint.getTopic(), this.predicate);
        LOG.info("Stopped Ignite Messaging consumer for topic {}.", (Object)this.endpoint.getTopic());
    }
}

