package org.apache.camel.component.nats;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsMessage;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/nats/NatsProducer.class */
public class NatsProducer extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(NatsProducer.class);
    private final ExecutorServiceManager executorServiceManager;
    private ScheduledExecutorService scheduler;
    private Connection connection;

    public NatsProducer(NatsEndpoint natsEndpoint) {
        super(natsEndpoint);
        this.executorServiceManager = natsEndpoint.getCamelContext().getExecutorServiceManager();
    }

    @Override // org.apache.camel.support.DefaultProducer, org.apache.camel.EndpointAware
    public NatsEndpoint getEndpoint() {
        return (NatsEndpoint) super.getEndpoint();
    }

    @Override // org.apache.camel.AsyncProcessor
    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        NatsConfiguration configuration = getEndpoint().getConfiguration();
        byte[] bArr = (byte[]) exchange.getIn().getBody(byte[].class);
        if (bArr == null) {
            try {
                bArr = ((String) exchange.getIn().getMandatoryBody(String.class)).getBytes();
            } catch (InvalidPayloadException e) {
                exchange.setException(e);
                asyncCallback.done(true);
                return true;
            }
        }
        if (exchange.getPattern().isOutCapable()) {
            LOG.debug("Requesting to topic: {}", configuration.getTopic());
            CompletableFuture<Message> request = this.connection.request(NatsMessage.builder().data(bArr).subject(configuration.getTopic()).headers(buildHeaders(exchange)).build());
            CompletableFuture failAfter = failAfter(exchange, Duration.ofMillis(configuration.getRequestTimeout()));
            CompletableFuture.anyOf(request, failAfter).whenComplete((obj, th) -> {
                if (th == null) {
                    Message message = (Message) obj;
                    exchange.getMessage().setBody(message.getData());
                    exchange.getMessage().setHeader(NatsConstants.NATS_REPLY_TO, message.getReplyTo());
                    exchange.getMessage().setHeader(NatsConstants.NATS_SID, message.getSID());
                    exchange.getMessage().setHeader(NatsConstants.NATS_SUBJECT, message.getSubject());
                    exchange.getMessage().setHeader(NatsConstants.NATS_QUEUE_NAME, message.getSubscription().getQueueName());
                    exchange.getMessage().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP, Long.valueOf(System.currentTimeMillis()));
                } else {
                    exchange.setException(th.getCause());
                }
                asyncCallback.done(false);
                if (!request.isDone()) {
                    request.cancel(true);
                }
                if (failAfter.isDone()) {
                    return;
                }
                failAfter.cancel(true);
            });
            return false;
        }
        LOG.debug("Publishing to topic: {}", configuration.getTopic());
        NatsMessage.Builder headers = NatsMessage.builder().data(bArr).subject(configuration.getTopic()).headers(buildHeaders(exchange));
        if (ObjectHelper.isNotEmpty(configuration.getReplySubject())) {
            headers.replyTo(configuration.getReplySubject());
        }
        this.connection.publish(headers.build());
        asyncCallback.done(true);
        return true;
    }

    private Headers buildHeaders(Exchange exchange) {
        Headers headers = new Headers();
        HeaderFilterStrategy headerFilterStrategy = getEndpoint().getConfiguration().getHeaderFilterStrategy();
        exchange.getIn().getHeaders().forEach((str, obj) -> {
            if (headerFilterStrategy.applyFilterToCamelHeaders(str, obj, exchange)) {
                LOG.debug("Excluding header {} as per strategy", str);
                return;
            }
            String str = obj instanceof byte[] ? new String((byte[]) obj, StandardCharsets.UTF_8) : String.valueOf(obj);
            if (headers.get(str) != null) {
                headers.get(str).add(str);
            } else {
                headers.add(str, str);
            }
        });
        return headers;
    }

    private <T> CompletableFuture<T> failAfter(Exchange exchange, Duration duration) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        this.scheduler.schedule(() -> {
            return Boolean.valueOf(completableFuture.completeExceptionally(new ExchangeTimedOutException(exchange, duration.toMillis())));
        }, duration.toNanos(), TimeUnit.NANOSECONDS);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.DefaultProducer, org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        ThreadPoolProfile threadPoolProfile = this.executorServiceManager.getThreadPoolProfile(NatsConstants.NATS_REQUEST_TIMEOUT_THREAD_PROFILE_NAME);
        if (threadPoolProfile == null) {
            threadPoolProfile = this.executorServiceManager.getDefaultThreadPoolProfile();
        }
        this.scheduler = this.executorServiceManager.newScheduledThreadPool(this, NatsConstants.NATS_REQUEST_TIMEOUT_THREAD_PROFILE_NAME, threadPoolProfile);
        super.doStart();
        LOG.debug("Starting Nats Producer");
        LOG.debug("Getting Nats Connection");
        this.connection = getEndpoint().getConfiguration().getConnection() != null ? getEndpoint().getConfiguration().getConnection() : getEndpoint().getConnection();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.DefaultProducer, org.apache.camel.support.service.BaseService
    public void doStop() throws Exception {
        if (this.scheduler != null) {
            this.executorServiceManager.shutdownNow(this.scheduler);
        }
        LOG.debug("Stopping Nats Producer");
        if (ObjectHelper.isEmpty(getEndpoint().getConfiguration().getConnection())) {
            LOG.debug("Closing Nats Connection");
            if (this.connection != null && !this.connection.getStatus().equals(Connection.Status.CLOSED)) {
                if (getEndpoint().getConfiguration().isFlushConnection()) {
                    LOG.debug("Flushing Nats Connection");
                    this.connection.flush(Duration.ofMillis(getEndpoint().getConfiguration().getFlushTimeout()));
                }
                this.connection.close();
            }
        }
        super.doStop();
    }
}
