package org.apache.camel.rx.support;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.processor.CamelInternalProcessor;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observer;
import rx.Subscription;
import rx.functions.Func1;

/* loaded from: input_file:org/apache/camel/rx/support/EndpointSubscription.class */
public class EndpointSubscription<T> implements Subscription {
    private static final Logger LOG = LoggerFactory.getLogger(EndpointSubscription.class);
    private final Endpoint endpoint;
    private final Observer<? super T> observer;
    private Consumer consumer;
    private final AtomicBoolean unsubscribed = new AtomicBoolean(false);

    public EndpointSubscription(Endpoint endpoint, Observer<? super T> observer, Func1<Exchange, T> func1) {
        this.endpoint = endpoint;
        this.observer = observer;
        CamelInternalProcessor camelInternalProcessor = new CamelInternalProcessor(new ProcessorToObserver(func1, observer));
        camelInternalProcessor.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice((RouteContext) null));
        try {
            this.consumer = endpoint.createConsumer(camelInternalProcessor);
            ServiceHelper.startService(this.consumer);
        } catch (Exception e) {
            observer.onError(e);
        }
    }

    public String toString() {
        return "EndpointSubscription[" + this.endpoint + " observer: " + this.observer + "]";
    }

    @Override // rx.Subscription
    public void unsubscribe() {
        if (!this.unsubscribed.compareAndSet(false, true) || this.consumer == null) {
            return;
        }
        try {
            ServiceHelper.stopServices(new Object[]{this.consumer});
        } catch (Exception e) {
            LOG.warn("Error stopping consumer: " + this.consumer + " due " + e.getMessage() + ". This exception is ignored.", e);
        }
    }

    @Override // rx.Subscription
    public boolean isUnsubscribed() {
        return this.unsubscribed.get();
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public Observer<? super T> getObserver() {
        return this.observer;
    }
}
