/*
 * Decompiled with CFR 0.152.
 */
package com.azure.messaging.servicebus;

import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.implementation.instrumentation.ReceiverKind;
import com.azure.messaging.servicebus.implementation.instrumentation.ServiceBusReceiverInstrumentation;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.apache.qpid.proton.message.Message;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

final class TracingFluxOperator<T>
extends BaseSubscriber<T> {
    private final CoreSubscriber<? super T> downstream;
    private final BiConsumer<T, Function<T, Throwable>> instrumentation;

    public static <T> Flux<T> create(Flux<T> upstream, final ServiceBusReceiverInstrumentation instrumentation) {
        if (!instrumentation.isEnabled() && instrumentation.isAsyncReceiverInstrumentation()) {
            return upstream;
        }
        return new FluxOperator<T, T>(upstream){

            public void subscribe(CoreSubscriber<? super T> actual) {
                Objects.requireNonNull(actual, "'actual' cannot be null.");
                this.source.subscribe(new TracingFluxOperator(actual, (msg, handler) -> {
                    if (msg instanceof Message) {
                        instrumentation.instrumentProcess((Message)msg, ReceiverKind.ASYNC_RECEIVER, (Function<Message, Throwable>)handler);
                    } else if (msg instanceof ServiceBusReceivedMessage) {
                        instrumentation.instrumentProcess((ServiceBusReceivedMessage)msg, ReceiverKind.ASYNC_RECEIVER, (Function<ServiceBusReceivedMessage, Throwable>)handler);
                    }
                }));
            }
        };
    }

    private TracingFluxOperator(CoreSubscriber<? super T> downstream, BiConsumer<T, Function<T, Throwable>> instrumentation) {
        this.downstream = downstream;
        this.instrumentation = instrumentation;
    }

    public Context currentContext() {
        return this.downstream.currentContext();
    }

    protected void hookOnSubscribe(Subscription subscription) {
        this.downstream.onSubscribe((Subscription)this);
    }

    protected void hookOnNext(T message) {
        this.instrumentation.accept(message, msg -> {
            this.downstream.onNext(msg);
            return null;
        });
    }

    protected void hookOnError(Throwable throwable) {
        this.downstream.onError(throwable);
    }

    protected void hookOnComplete() {
        this.downstream.onComplete();
    }
}

