package org.apache.camel.component.reactive.streams.engine;

import java.util.Objects;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsHelper;
import org.apache.camel.component.reactive.streams.api.DispatchCallback;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/apache/camel/component/reactive/streams/engine/UnwrappingPublisher.class */
public class UnwrappingPublisher implements Publisher<Exchange> {
    private final Publisher<Exchange> delegate;

    public UnwrappingPublisher(Publisher<Exchange> publisher) {
        Objects.requireNonNull(publisher, "delegate publisher cannot be null");
        this.delegate = publisher;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(final Subscriber<? super Exchange> subscriber) {
        this.delegate.subscribe(new Subscriber<Exchange>() { // from class: org.apache.camel.component.reactive.streams.engine.UnwrappingPublisher.1
            private Subscription subscription;

            @Override // org.reactivestreams.Subscriber
            public void onSubscribe(Subscription subscription) {
                if (subscription == null) {
                    throw new NullPointerException("subscription is null");
                }
                if (subscription == this.subscription) {
                    throw new IllegalArgumentException("already subscribed to the subscription: " + String.valueOf(subscription));
                }
                if (this.subscription != null) {
                    subscription.cancel();
                } else {
                    this.subscription = subscription;
                    subscriber.onSubscribe(subscription);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(Exchange exchange) {
                Exception exc = null;
                try {
                    subscriber.onNext(exchange);
                } catch (Exception e) {
                    exc = e;
                }
                DispatchCallback<Exchange> callback = ReactiveStreamsHelper.getCallback(exchange);
                if (callback != null) {
                    callback.processed(exchange, exc);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                subscriber.onError(th);
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                subscriber.onComplete();
            }
        });
    }
}
