package org.apache.camel.component.reactive.streams.engine;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConstants;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsHelper;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
import org.apache.camel.component.reactive.streams.util.MonoPublisher;
import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.function.Suppliers;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

@ManagedResource(description = "Managed CamelReactiveStreamsService")
/* loaded from: input_file:org/apache/camel/component/reactive/streams/engine/DefaultCamelReactiveStreamsService.class */
public class DefaultCamelReactiveStreamsService extends ServiceSupport implements CamelReactiveStreamsService {
    private final CamelContext context;
    private final ReactiveStreamsEngineConfiguration configuration;
    private ExecutorService workerPool;
    private final ConcurrentMap<String, CamelPublisher> publishers = new ConcurrentHashMap();
    private final ConcurrentMap<String, ReactiveStreamsCamelSubscriber> subscribers = new ConcurrentHashMap();
    private final ConcurrentMap<String, String> publishedUriToStream = new ConcurrentHashMap();
    private final ConcurrentMap<String, String> requestedUriToStream = new ConcurrentHashMap();
    private final Supplier<UnwrapStreamProcessor> unwrapStreamProcessorSupplier = Suppliers.memorize(UnwrapStreamProcessor::new);

    public DefaultCamelReactiveStreamsService(CamelContext camelContext, ReactiveStreamsEngineConfiguration reactiveStreamsEngineConfiguration) {
        this.context = camelContext;
        this.configuration = reactiveStreamsEngineConfiguration;
        init();
    }

    @Override // org.apache.camel.spi.HasId
    public String getId() {
        return ReactiveStreamsConstants.DEFAULT_SERVICE_NAME;
    }

    @Override // org.apache.camel.spi.HasCamelContext
    public CamelContext getCamelContext() {
        return this.context;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doInit() {
        if (this.workerPool == null) {
            this.workerPool = this.context.getExecutorServiceManager().newThreadPool(this, this.configuration.getThreadPoolName(), this.configuration.getThreadPoolMinSize(), this.configuration.getThreadPoolMaxSize());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStop() throws Exception {
        if (this.workerPool != null) {
            this.context.getExecutorServiceManager().shutdownNow(this.workerPool);
            this.workerPool = null;
        }
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public Publisher<Exchange> fromStream(String str) {
        return new UnwrappingPublisher(getPayloadPublisher(str));
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public <T> Publisher<T> fromStream(String str, Class<T> cls) {
        return Exchange.class.equals(cls) ? (Publisher<T>) fromStream(str) : new ConvertingPublisher(fromStream(str), cls);
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public ReactiveStreamsCamelSubscriber streamSubscriber(String str) {
        return this.subscribers.computeIfAbsent(str, str2 -> {
            return new ReactiveStreamsCamelSubscriber(str);
        });
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public <T> Subscriber<T> streamSubscriber(String str, Class<T> cls) {
        return Exchange.class.equals(cls) ? streamSubscriber(str) : new ConvertingSubscriber(streamSubscriber(str), this.context, cls);
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public void sendCamelExchange(String str, Exchange exchange) {
        getPayloadPublisher(str).publish(exchange);
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public Publisher<Exchange> toStream(String str, Object obj) {
        return doRequest(str, ReactiveStreamsHelper.convertToExchange(this.context, obj));
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public Function<?, ? extends Publisher<Exchange>> toStream(String str) {
        return obj -> {
            return toStream(str, obj);
        };
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public <T> Publisher<T> toStream(String str, Object obj, Class<T> cls) {
        return new ConvertingPublisher(toStream(str, obj), cls);
    }

    protected Publisher<Exchange> doRequest(String str, Exchange exchange) {
        ReactiveStreamsConsumer consumer = streamSubscriber(str).getConsumer();
        if (consumer == null) {
            throw new IllegalStateException("No consumers attached to the stream " + str);
        }
        final DelayedMonoPublisher delayedMonoPublisher = new DelayedMonoPublisher(this.workerPool);
        exchange.getExchangeExtension().addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.1
            @Override // org.apache.camel.spi.Synchronization
            public void onComplete(Exchange exchange2) {
                delayedMonoPublisher.setData(exchange2);
            }

            @Override // org.apache.camel.spi.Synchronization
            public void onFailure(Exchange exchange2) {
                Exception exception = exchange2.getException();
                if (exception == null) {
                    exception = new IllegalStateException("Unknown Exception");
                }
                delayedMonoPublisher.setException(exception);
            }
        });
        consumer.process(exchange, z -> {
        });
        return delayedMonoPublisher;
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public <T> Function<Object, Publisher<T>> toStream(String str, Class<T> cls) {
        return obj -> {
            return toStream(str, obj, cls);
        };
    }

    private CamelPublisher getPayloadPublisher(String str) {
        this.publishers.computeIfAbsent(str, str2 -> {
            return new CamelPublisher(this.workerPool, this.context, str2);
        });
        return this.publishers.get(str);
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public Publisher<Exchange> from(String str) {
        this.publishedUriToStream.computeIfAbsent(str, str2 -> {
            try {
                final String generateUuid = this.context.getUuidGenerator().generateUuid();
                new RouteBuilder() { // from class: org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.2
                    @Override // org.apache.camel.builder.RouteBuilder
                    public void configure() {
                        from(str2).to("reactive-streams:" + generateUuid);
                    }
                }.addRoutesToCamelContext(this.context);
                return generateUuid;
            } catch (Exception e) {
                throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + str, e);
            }
        });
        return fromStream(this.publishedUriToStream.get(str));
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public <T> Publisher<T> from(String str, Class<T> cls) {
        return new ConvertingPublisher(from(str), cls);
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public Subscriber<Exchange> subscriber(final String str) {
        try {
            final String generateUuid = this.context.getUuidGenerator().generateUuid();
            new RouteBuilder() { // from class: org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.3
                @Override // org.apache.camel.builder.RouteBuilder
                public void configure() {
                    from("reactive-streams:" + generateUuid).to(str);
                }
            }.addRoutesToCamelContext(this.context);
            return streamSubscriber(generateUuid);
        } catch (Exception e) {
            throw new IllegalStateException("Unable to create source reactive stream towards direct URI: " + str, e);
        }
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public <T> Subscriber<T> subscriber(String str, Class<T> cls) {
        return new ConvertingSubscriber(subscriber(str), this.context, cls);
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public Publisher<Exchange> to(String str, Object obj) {
        this.requestedUriToStream.computeIfAbsent(str, str2 -> {
            try {
                final String generateUuid = this.context.getUuidGenerator().generateUuid();
                new RouteBuilder() { // from class: org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.4
                    @Override // org.apache.camel.builder.RouteBuilder
                    public void configure() {
                        from("reactive-streams:" + generateUuid).to(str2);
                    }
                }.addRoutesToCamelContext(this.context);
                return generateUuid;
            } catch (Exception e) {
                throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + str, e);
            }
        });
        return toStream(this.requestedUriToStream.get(str), obj);
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public Function<Object, Publisher<Exchange>> to(String str) {
        return obj -> {
            return to(str, obj);
        };
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public <T> Publisher<T> to(String str, Object obj, Class<T> cls) {
        return new ConvertingPublisher(to(str, obj), cls);
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public <T> Function<Object, Publisher<T>> to(String str, Class<T> cls) {
        return obj -> {
            return to(str, obj, cls);
        };
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public void process(final String str, final Function<? super Publisher<Exchange>, ?> function) {
        try {
            new RouteBuilder() { // from class: org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.5
                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.camel.builder.RouteBuilder
                public void configure() {
                    RouteDefinition from = from(str);
                    Function function2 = function;
                    ((RouteDefinition) from.process(exchange -> {
                        exchange.getIn().setBody(function2.apply(new MonoPublisher(exchange.copy())));
                    })).process(DefaultCamelReactiveStreamsService.this.unwrapStreamProcessorSupplier.get());
                }
            }.addRoutesToCamelContext(this.context);
        } catch (Exception e) {
            throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + str, e);
        }
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public <T> void process(String str, Class<T> cls, Function<? super Publisher<T>, ?> function) {
        process(str, publisher -> {
            return function.apply(new ConvertingPublisher(publisher, cls));
        });
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public ReactiveStreamsCamelSubscriber attachCamelConsumer(String str, ReactiveStreamsConsumer reactiveStreamsConsumer) {
        ReactiveStreamsCamelSubscriber streamSubscriber = streamSubscriber(str);
        streamSubscriber.attachConsumer(reactiveStreamsConsumer);
        return streamSubscriber;
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public void detachCamelConsumer(String str) {
        streamSubscriber(str).detachConsumer();
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public void attachCamelProducer(String str, ReactiveStreamsProducer reactiveStreamsProducer) {
        getPayloadPublisher(str).attachProducer(reactiveStreamsProducer);
    }

    @Override // org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService
    public void detachCamelProducer(String str) {
        getPayloadPublisher(str).detachProducer();
    }

    @ManagedOperation(description = "Information about Camel Reactive subscribers")
    public TabularData camelSubscribers() {
        try {
            TabularDataSupport tabularDataSupport = new TabularDataSupport(subscribersTabularType());
            this.subscribers.forEach((str, reactiveStreamsCamelSubscriber) -> {
                try {
                    tabularDataSupport.put(new CompositeDataSupport(subscribersCompositeType(), new String[]{"name", "inflight", "requested"}, new Object[]{str, Long.valueOf(reactiveStreamsCamelSubscriber.getInflightCount()), Long.valueOf(reactiveStreamsCamelSubscriber.getRequested())}));
                } catch (Exception e) {
                    throw RuntimeCamelException.wrapRuntimeCamelException(e);
                }
            });
            return tabularDataSupport;
        } catch (Exception e) {
            throw RuntimeCamelException.wrapRuntimeCamelException(e);
        }
    }

    @ManagedOperation(description = "Information about Camel Reactive publishers")
    public TabularData camelPublishers() {
        try {
            TabularDataSupport tabularDataSupport = new TabularDataSupport(publishersTabularType());
            this.publishers.forEach((str, camelPublisher) -> {
                try {
                    List<CamelSubscription> subscriptions = camelPublisher.getSubscriptions();
                    int size = subscriptions.size();
                    TabularDataSupport tabularDataSupport2 = new TabularDataSupport(subscriptionsTabularType());
                    CompositeType subscriptionsCompositeType = subscriptionsCompositeType();
                    for (CamelSubscription camelSubscription : subscriptions) {
                        tabularDataSupport2.put(new CompositeDataSupport(subscriptionsCompositeType, new String[]{"name", "buffer size", "back pressure"}, new Object[]{camelSubscription.getId(), Long.valueOf(camelSubscription.getBufferSize()), camelSubscription.getBackpressureStrategy() != null ? camelSubscription.getBackpressureStrategy().name() : ""}));
                    }
                    tabularDataSupport.put(new CompositeDataSupport(publishersCompositeType(), new String[]{"name", "subscribers", "subscriptions"}, new Object[]{str, Integer.valueOf(size), tabularDataSupport2}));
                } catch (Exception e) {
                    throw RuntimeCamelException.wrapRuntimeCamelException(e);
                }
            });
            return tabularDataSupport;
        } catch (Exception e) {
            throw RuntimeCamelException.wrapRuntimeCamelException(e);
        }
    }

    private static TabularType subscribersTabularType() throws OpenDataException {
        return new TabularType("subscribers", "Information about Camel Reactive subscribers", subscribersCompositeType(), new String[]{"name"});
    }

    private static CompositeType subscribersCompositeType() throws OpenDataException {
        return new CompositeType("subscriptions", "Subscriptions", new String[]{"name", "inflight", "requested"}, new String[]{"Name", "Inflight", "Requested"}, new OpenType[]{SimpleType.STRING, SimpleType.LONG, SimpleType.LONG});
    }

    private static CompositeType publishersCompositeType() throws OpenDataException {
        return new CompositeType("publishers", "Publishers", new String[]{"name", "subscribers", "subscriptions"}, new String[]{"Name", "Subscribers", "Subscriptions"}, new OpenType[]{SimpleType.STRING, SimpleType.INTEGER, subscriptionsTabularType()});
    }

    private static TabularType subscriptionsTabularType() throws OpenDataException {
        return new TabularType("subscriptions", "Information about External Reactive subscribers", subscriptionsCompositeType(), new String[]{"name"});
    }

    private static CompositeType subscriptionsCompositeType() throws OpenDataException {
        return new CompositeType("subscriptions", "Subscriptions", new String[]{"name", "buffer size", "back pressure"}, new String[]{"Name", "Buffer Size", "Back Pressure"}, new OpenType[]{SimpleType.STRING, SimpleType.LONG, SimpleType.STRING});
    }

    private static TabularType publishersTabularType() throws OpenDataException {
        return new TabularType("publishers", "Information about Camel Reactive publishers", publishersCompositeType(), new String[]{"name"});
    }
}
