/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.memory;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.processors.BroadcastProcessor;
import io.smallrye.mutiny.operators.multi.processors.UnicastProcessor;
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.memory.InMemoryConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.memory.InMemoryConnectorOutgoingConfiguration;
import io.smallrye.reactive.messaging.memory.InMemorySink;
import io.smallrye.reactive.messaging.memory.InMemorySource;
import io.smallrye.reactive.messaging.memory.i18n.InMemoryExceptions;
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage;
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
import io.vertx.core.Context;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.lang.invoke.LambdaMetafactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;

@ApplicationScoped
@Connector(value="smallrye-in-memory")
@ConnectorAttributes(value={@ConnectorAttribute(name="run-on-vertx-context", type="boolean", direction=ConnectorAttribute.Direction.INCOMING, description="Whether messages are dispatched on the Vert.x context or not.", defaultValue="false"), @ConnectorAttribute(name="broadcast", type="boolean", direction=ConnectorAttribute.Direction.INCOMING, description="Whether the messages are dispatched to multiple consumer", defaultValue="false")})
public class InMemoryConnector
implements InboundConnector,
OutboundConnector {
    public static final String CONNECTOR = "smallrye-in-memory";
    private final Map<String, InMemorySourceImpl<?>> sources = new HashMap();
    private final Map<String, InMemorySinkImpl<?>> sinks = new HashMap();
    @Inject
    ExecutionHolder executionHolder;

    public static Map<String, String> switchIncomingChannelsToInMemory(String ... channels) {
        LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
        for (String channel : channels) {
            if (channel == null || channel.trim().isEmpty()) {
                throw InMemoryExceptions.ex.illegalArgumentChannelNameNull();
            }
            String key = "mp.messaging.incoming." + channel + ".connector";
            properties.put(key, CONNECTOR);
            System.setProperty(key, CONNECTOR);
        }
        return properties;
    }

    public static Map<String, String> switchOutgoingChannelsToInMemory(String ... channels) {
        LinkedHashMap<String, String> properties = new LinkedHashMap<String, String>();
        for (String channel : channels) {
            if (channel == null || channel.trim().isEmpty()) {
                throw InMemoryExceptions.ex.illegalArgumentChannelNameNull();
            }
            String key = "mp.messaging.outgoing." + channel + ".connector";
            properties.put(key, CONNECTOR);
            System.setProperty(key, CONNECTOR);
        }
        return properties;
    }

    public static void clear() {
        List<String> list = System.getProperties().entrySet().stream().filter(entry -> CONNECTOR.equals(entry.getValue())).map(entry -> (String)entry.getKey()).collect(Collectors.toList());
        list.forEach(System::clearProperty);
    }

    public Flow.Publisher<? extends Message<?>> getPublisher(Config config) {
        InMemoryConnectorIncomingConfiguration ic = new InMemoryConnectorIncomingConfiguration(config);
        String name = ic.getChannel();
        boolean broadcast = ic.getBroadcast();
        Vertx vertx = this.executionHolder.vertx();
        boolean runOnVertxContext = ic.getRunOnVertxContext();
        return this.sources.computeIfAbsent((String)name, (Function<String, InMemorySourceImpl>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$getPublisher$2(io.vertx.mutiny.core.Vertx boolean boolean java.lang.String ), (Ljava/lang/String;)Lio/smallrye/reactive/messaging/memory/InMemoryConnector$InMemorySourceImpl;)((Vertx)vertx, (boolean)runOnVertxContext, (boolean)broadcast)).source;
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) {
        InMemoryConnectorOutgoingConfiguration ic = new InMemoryConnectorOutgoingConfiguration(config);
        String name = ic.getChannel();
        return this.sinks.computeIfAbsent((String)name, (Function<String, InMemorySinkImpl>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, <init>(java.lang.String ), (Ljava/lang/String;)Lio/smallrye/reactive/messaging/memory/InMemoryConnector$InMemorySinkImpl;)()).sink;
    }

    public <T> InMemorySource<T> source(String channel) {
        if (channel == null) {
            throw InMemoryExceptions.ex.illegalArgumentChannelMustNotBeNull();
        }
        InMemorySourceImpl<?> source = this.sources.get(channel);
        if (source == null) {
            throw InMemoryExceptions.ex.illegalArgumentUnknownChannel(channel);
        }
        return source;
    }

    public <T> InMemorySink<T> sink(String channel) {
        if (channel == null) {
            throw InMemoryExceptions.ex.illegalArgumentChannelMustNotBeNull();
        }
        InMemorySink sink = this.sinks.get(channel);
        if (sink == null) {
            throw InMemoryExceptions.ex.illegalArgumentUnknownChannel(channel);
        }
        return sink;
    }

    private static /* synthetic */ InMemorySourceImpl lambda$getPublisher$2(Vertx vertx, boolean runOnVertxContext, boolean broadcast, String n) {
        return new InMemorySourceImpl(n, vertx, runOnVertxContext, broadcast);
    }

    private static class InMemorySourceImpl<T>
    implements InMemorySource<T> {
        private final Flow.Processor<Message<T>, Message<T>> processor;
        private final Flow.Publisher<? extends Message<T>> source;
        private final String name;
        private final io.vertx.mutiny.core.Context context;
        private boolean runOnVertxContext;

        private InMemorySourceImpl(String name, Vertx vertx, boolean runOnVertxContext, boolean broadcast) {
            this.name = name;
            this.context = vertx.getOrCreateContext();
            this.runOnVertxContext = runOnVertxContext;
            this.processor = broadcast ? BroadcastProcessor.create() : UnicastProcessor.create();
            this.source = Multi.createFrom().publisher(this.processor);
        }

        @Override
        public String name() {
            return this.name;
        }

        @Override
        public InMemorySource<T> send(T messageOrPayload) {
            if (messageOrPayload instanceof Message) {
                if (this.runOnVertxContext) {
                    this.context.runOnContext(() -> this.processor.onNext(ContextAwareMessage.withContextMetadata((Message)((Message)messageOrPayload))));
                } else {
                    this.processor.onNext(ContextAwareMessage.withContextMetadata((Message)((Message)messageOrPayload)));
                }
            } else if (this.runOnVertxContext) {
                this.context.runOnContext(() -> this.processor.onNext((Message<ContextAwareMessage>)ContextAwareMessage.of((Object)messageOrPayload)));
            } else {
                this.processor.onNext((Message<ContextAwareMessage>)ContextAwareMessage.of(messageOrPayload));
            }
            return this;
        }

        @Override
        public InMemorySource<T> runOnVertxContext(boolean runOnVertxContext) {
            this.runOnVertxContext = runOnVertxContext;
            return this;
        }

        @Override
        public void complete() {
            if (this.runOnVertxContext) {
                this.context.runOnContext(() -> this.processor.onComplete());
            } else {
                this.processor.onComplete();
            }
        }

        @Override
        public void fail(Throwable failure) {
            if (this.runOnVertxContext) {
                this.context.runOnContext(() -> this.processor.onError(failure));
            } else {
                this.processor.onError(failure);
            }
        }
    }

    private static class InMemorySinkImpl<T>
    implements InMemorySink<T> {
        private final Flow.Subscriber<? extends Message<T>> sink;
        private final List<Message<T>> list = new CopyOnWriteArrayList<Message<T>>();
        private final AtomicReference<Throwable> failure = new AtomicReference();
        private final AtomicBoolean completed = new AtomicBoolean();
        private final String name;

        private InMemorySinkImpl(String name) {
            this.name = name;
            this.sink = MultiUtils.via(multi -> multi.call(m -> {
                this.list.add((Message<T>)m);
                Uni ack = Uni.createFrom().completionStage(() -> ((Message)m).ack());
                if (m.getMetadata(LocalContextMetadata.class).isPresent()) {
                    io.vertx.mutiny.core.Context ctx = io.vertx.mutiny.core.Context.newInstance((Context)((LocalContextMetadata)m.getMetadata(LocalContextMetadata.class).get()).context());
                    ack = ack.emitOn(arg_0 -> ((io.vertx.mutiny.core.Context)ctx).runOnContext(arg_0));
                }
                return ack;
            }).onFailure().invoke(err -> this.failure.compareAndSet((Throwable)null, (Throwable)err)).onCompletion().invoke(() -> this.completed.compareAndSet(false, true)));
        }

        @Override
        public String name() {
            return this.name;
        }

        @Override
        public List<? extends Message<T>> received() {
            return new ArrayList<Message<T>>(this.list);
        }

        @Override
        public void clear() {
            this.completed.set(false);
            this.failure.set(null);
            this.list.clear();
        }

        @Override
        public boolean hasCompleted() {
            return this.completed.get();
        }

        @Override
        public boolean hasFailed() {
            return this.getFailure() != null;
        }

        @Override
        public Throwable getFailure() {
            return this.failure.get();
        }
    }
}

