/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.rxjava.engine;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableSubscriber;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.MulticastProcessor;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.Exchange;
import org.apache.camel.component.reactive.streams.ReactiveStreamsBackpressureStrategy;
import org.apache.camel.component.reactive.streams.ReactiveStreamsDiscardedException;
import org.apache.camel.component.reactive.streams.ReactiveStreamsHelper;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
import org.apache.camel.component.rxjava.engine.RxJavaStreamsService;
import org.apache.camel.util.ObjectHelper;
import org.reactivestreams.Publisher;

final class RxJavaCamelProcessor
implements Closeable {
    private final String name;
    private final RxJavaStreamsService service;
    private final AtomicReference<FlowableEmitter<Exchange>> camelEmitter;
    private final FlowableProcessor<Exchange> publisher;
    private ReactiveStreamsProducer camelProducer;

    RxJavaCamelProcessor(RxJavaStreamsService service, String name) {
        this.service = service;
        this.name = name;
        this.camelProducer = null;
        this.camelEmitter = new AtomicReference();
        this.publisher = MulticastProcessor.create(1);
    }

    @Override
    public void close() throws IOException {
        this.detach();
    }

    Publisher<Exchange> getPublisher() {
        return this.publisher;
    }

    synchronized void attach(ReactiveStreamsProducer producer) {
        Objects.requireNonNull(producer, "producer cannot be null, use the detach method");
        if (this.camelProducer != null) {
            throw new IllegalStateException("A producer is already attached to the stream '" + this.name + "'");
        }
        if (this.camelProducer != producer) {
            this.detach();
            ReactiveStreamsBackpressureStrategy strategy = producer.getEndpoint().getBackpressureStrategy();
            Flowable<Exchange> flow = Flowable.create(this.camelEmitter::set, BackpressureStrategy.MISSING);
            if (ObjectHelper.equal((Object)strategy, (Object)ReactiveStreamsBackpressureStrategy.OLDEST)) {
                flow.onBackpressureDrop(this::onBackPressure).doAfterNext(this::onItemEmitted).subscribe((FlowableSubscriber<Exchange>)this.publisher);
            } else if (ObjectHelper.equal((Object)strategy, (Object)ReactiveStreamsBackpressureStrategy.LATEST)) {
                flow.doAfterNext(this::onItemEmitted).onBackpressureLatest().subscribe((FlowableSubscriber<Exchange>)this.publisher);
            } else {
                flow.doAfterNext(this::onItemEmitted).onBackpressureBuffer().subscribe((FlowableSubscriber<Exchange>)this.publisher);
            }
            this.camelProducer = producer;
        }
    }

    synchronized void detach() {
        this.camelProducer = null;
        this.camelEmitter.set(null);
    }

    void send(Exchange exchange) {
        if (this.service.isRunAllowed()) {
            FlowableEmitter<Exchange> emitter = ObjectHelper.notNull(this.camelEmitter.get(), "FlowableEmitter");
            emitter.onNext(exchange);
        }
    }

    private void onItemEmitted(Exchange exchange) {
        if (this.service.isRunAllowed()) {
            ReactiveStreamsHelper.invokeDispatchCallback(exchange);
        }
    }

    private void onBackPressure(Exchange exchange) {
        ReactiveStreamsHelper.invokeDispatchCallback(exchange, new ReactiveStreamsDiscardedException("Discarded by back pressure strategy", exchange, this.name));
    }
}

