/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.rx;

import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.Service;
import org.apache.camel.processor.PipelineHelper;
import org.apache.camel.rx.RuntimeCamelRxException;
import org.apache.camel.util.ServiceHelper;
import rx.Observable;
import rx.Subscriber;

public class CamelOperator
implements Observable.Operator<Exchange, Exchange> {
    private ProducerTemplate producerTemplate;
    private Endpoint endpoint;

    public CamelOperator(CamelContext context, String uri) throws Exception {
        this.producerTemplate = context.createProducerTemplate();
        this.endpoint = context.getEndpoint(uri);
        ServiceHelper.startService((Service)this.producerTemplate);
    }

    public CamelOperator(Endpoint endpoint) throws Exception {
        this.producerTemplate = endpoint.getCamelContext().createProducerTemplate();
        this.endpoint = endpoint;
        ServiceHelper.startService((Service)this.producerTemplate);
    }

    @Override
    public Subscriber<? super Exchange> call(final Subscriber<? super Exchange> s) {
        return new Subscriber<Exchange>(s){

            @Override
            public void onCompleted() {
                try {
                    ServiceHelper.stopService((Object)CamelOperator.this.producerTemplate);
                }
                catch (Exception e) {
                    throw new RuntimeCamelRxException(e);
                }
                finally {
                    CamelOperator.this.producerTemplate = null;
                }
                if (!s.isUnsubscribed()) {
                    s.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (!s.isUnsubscribed()) {
                    s.onError(e);
                }
            }

            @Override
            public void onNext(Exchange item) {
                if (!s.isUnsubscribed()) {
                    Exchange exchange = CamelOperator.this.process(item);
                    if (exchange.getException() != null) {
                        s.onError(exchange.getException());
                    } else {
                        s.onNext(PipelineHelper.createNextExchange((Exchange)exchange));
                    }
                }
            }
        };
    }

    private Exchange process(Exchange exchange) {
        try {
            exchange = this.producerTemplate.send(this.endpoint, exchange);
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
        }
        return exchange;
    }
}

