package io.smallrye.reactive.messaging.connectors;

import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.connectors.i18n.InMemoryExceptions;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.Processor;

@ApplicationScoped
@Connector(InMemoryConnector.CONNECTOR)
/* loaded from: input_file:io/smallrye/reactive/messaging/connectors/InMemoryConnector.class */
public class InMemoryConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
    public static final String CONNECTOR = "smallrye-in-memory";
    private final Map<String, InMemorySourceImpl<?>> sources = new HashMap();
    private final Map<String, InMemorySinkImpl<?>> sinks = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/connectors/InMemoryConnector$InMemorySinkImpl.class */
    public static class InMemorySinkImpl<T> implements InMemorySink<T> {
        private final SubscriberBuilder<? extends Message<T>, Void> sink;
        private final List<Message<T>> list;
        private final AtomicReference<Throwable> failure;
        private final AtomicBoolean completed;
        private final String name;

        private InMemorySinkImpl(String str) {
            this.list = new CopyOnWriteArrayList();
            this.failure = new AtomicReference<>();
            this.completed = new AtomicBoolean();
            this.name = str;
            this.sink = ReactiveStreams.builder().flatMapCompletionStage(message -> {
                this.list.add(message);
                return message.ack().thenApply(r3 -> {
                    return message;
                });
            }).onError(th -> {
                this.failure.compareAndSet(null, th);
            }).onComplete(() -> {
                this.completed.compareAndSet(false, true);
            }).ignore();
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySink
        public String name() {
            return this.name;
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySink
        public List<? extends Message<T>> received() {
            return new ArrayList(this.list);
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySink
        public void clear() {
            this.completed.set(false);
            this.failure.set(null);
            this.list.clear();
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySink
        public boolean hasCompleted() {
            return this.completed.get();
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySink
        public boolean hasFailed() {
            return getFailure() != null;
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySink
        public Throwable getFailure() {
            return this.failure.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/connectors/InMemoryConnector$InMemorySourceImpl.class */
    public static class InMemorySourceImpl<T> implements InMemorySource<T> {
        private final Processor<Message<T>, Message<T>> processor;
        private final PublisherBuilder<? extends Message<T>> source;
        private final String name;

        private InMemorySourceImpl(String str, boolean z) {
            this.name = str;
            if (z) {
                this.processor = BroadcastProcessor.create();
            } else {
                this.processor = UnicastProcessor.create();
            }
            this.source = ReactiveStreams.fromPublisher(this.processor);
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySource
        public String name() {
            return this.name;
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySource
        public InMemorySource<T> send(T t) {
            if (t instanceof Message) {
                this.processor.onNext((Message) t);
            } else {
                this.processor.onNext(Message.of(t));
            }
            return this;
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySource
        public void complete() {
            this.processor.onComplete();
        }

        @Override // io.smallrye.reactive.messaging.connectors.InMemorySource
        public void fail(Throwable th) {
            this.processor.onError(th);
        }
    }

    public static Map<String, String> switchIncomingChannelsToInMemory(String... strArr) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (String str : strArr) {
            if (str == null || str.trim().isEmpty()) {
                throw InMemoryExceptions.ex.illegalArgumentChannelNameNull();
            }
            String str2 = "mp.messaging.incoming." + str + ".connector";
            linkedHashMap.put(str2, CONNECTOR);
            System.setProperty(str2, CONNECTOR);
        }
        return linkedHashMap;
    }

    public static Map<String, String> switchOutgoingChannelsToInMemory(String... strArr) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (String str : strArr) {
            if (str == null || str.trim().isEmpty()) {
                throw InMemoryExceptions.ex.illegalArgumentChannelNameNull();
            }
            String str2 = "mp.messaging.outgoing." + str + ".connector";
            linkedHashMap.put(str2, CONNECTOR);
            System.setProperty(str2, CONNECTOR);
        }
        return linkedHashMap;
    }

    public static void clear() {
        ((List) System.getProperties().entrySet().stream().filter(entry -> {
            return CONNECTOR.equals(entry.getValue());
        }).map(entry2 -> {
            return (String) entry2.getKey();
        }).collect(Collectors.toList())).forEach(System::clearProperty);
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
        Optional optionalValue = config.getOptionalValue("channel-name", String.class);
        InMemoryExceptions inMemoryExceptions = InMemoryExceptions.ex;
        Objects.requireNonNull(inMemoryExceptions);
        String str = (String) optionalValue.orElseThrow(inMemoryExceptions::illegalArgumentInvalidIncomingConfig);
        boolean booleanValue = ((Boolean) config.getOptionalValue("broadcast", Boolean.class).orElse(false)).booleanValue();
        return ((InMemorySourceImpl) this.sources.computeIfAbsent(str, str2 -> {
            return new InMemorySourceImpl(str2, booleanValue);
        })).source;
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(Config config) {
        Optional optionalValue = config.getOptionalValue("channel-name", String.class);
        InMemoryExceptions inMemoryExceptions = InMemoryExceptions.ex;
        Objects.requireNonNull(inMemoryExceptions);
        return ((InMemorySinkImpl) this.sinks.computeIfAbsent((String) optionalValue.orElseThrow(inMemoryExceptions::illegalArgumentInvalidOutgoingConfig), str -> {
            return new InMemorySinkImpl(str);
        })).sink;
    }

    public <T> InMemorySource<T> source(String str) {
        if (str == null) {
            throw InMemoryExceptions.ex.illegalArgumentChannelMustNotBeNull();
        }
        InMemorySourceImpl<?> inMemorySourceImpl = this.sources.get(str);
        if (inMemorySourceImpl == null) {
            throw InMemoryExceptions.ex.illegalArgumentUnknownChannel(str);
        }
        return inMemorySourceImpl;
    }

    public <T> InMemorySink<T> sink(String str) {
        if (str == null) {
            throw InMemoryExceptions.ex.illegalArgumentChannelMustNotBeNull();
        }
        InMemorySinkImpl<?> inMemorySinkImpl = this.sinks.get(str);
        if (inMemorySinkImpl == null) {
            throw InMemoryExceptions.ex.illegalArgumentUnknownChannel(str);
        }
        return inMemorySinkImpl;
    }
}
