package org.apache.camel.component.reactive.streams.util;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.Exchange;
import org.apache.camel.TypeConversionException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/reactive/streams/util/ConvertingPublisher.class */
public class ConvertingPublisher<R> implements Publisher<R> {
    private static final Logger LOG = LoggerFactory.getLogger(ConvertingPublisher.class);
    private final Publisher<Exchange> delegate;
    private final Class<R> type;
    private final BodyConverter<R> converter;

    public ConvertingPublisher(Publisher<Exchange> publisher, Class<R> cls) {
        Objects.requireNonNull(publisher, "delegate publisher cannot be null");
        Objects.requireNonNull(cls, "type cannot be null");
        this.delegate = publisher;
        this.type = cls;
        this.converter = BodyConverter.forType(cls);
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(final Subscriber<? super R> subscriber) {
        this.delegate.subscribe(new Subscriber<Exchange>() { // from class: org.apache.camel.component.reactive.streams.util.ConvertingPublisher.1
            private AtomicBoolean active = new AtomicBoolean(true);
            private Subscription subscription;

            @Override // org.reactivestreams.Subscriber
            public void onSubscribe(Subscription subscription) {
                if (subscription == null) {
                    throw new NullPointerException("subscription is null");
                }
                if (subscription == this.subscription) {
                    throw new IllegalArgumentException("already subscribed to the subscription: " + subscription);
                }
                if (this.subscription != null) {
                    subscription.cancel();
                } else {
                    this.subscription = subscription;
                    subscriber.onSubscribe(subscription);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(Exchange exchange) {
                R r;
                if (this.active.get()) {
                    try {
                        r = ConvertingPublisher.this.converter.apply(exchange);
                    } catch (TypeConversionException e) {
                        ConvertingPublisher.LOG.warn("Unable to convert body to the specified type: {}", ConvertingPublisher.this.type.getName(), e);
                        r = null;
                    }
                    if (r != null || exchange.getIn().getBody() == null) {
                        subscriber.onNext(r);
                        return;
                    }
                    onError(new ClassCastException("Unable to convert body to the specified type: " + ConvertingPublisher.this.type.getName()));
                    this.active.set(false);
                    this.subscription.cancel();
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                if (this.active.get()) {
                    subscriber.onError(th);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                if (this.active.get()) {
                    subscriber.onComplete();
                }
            }
        });
    }
}
