/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.jms.reply;

import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.Session;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.component.jms.JmsEndpoint;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.component.jms.JmsMessageHelper;
import org.apache.camel.component.jms.MessageListenerContainerFactory;
import org.apache.camel.component.jms.reply.CorrelationTimeoutMap;
import org.apache.camel.component.jms.reply.QueueReplyHandler;
import org.apache.camel.component.jms.reply.ReplyHandler;
import org.apache.camel.component.jms.reply.ReplyHolder;
import org.apache.camel.component.jms.reply.ReplyManager;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.support.task.ForegroundTask;
import org.apache.camel.support.task.Tasks;
import org.apache.camel.support.task.budget.Budgets;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;

public abstract class ReplyManagerSupport
extends ServiceSupport
implements ReplyManager {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final CamelContext camelContext;
    protected ScheduledExecutorService scheduledExecutorService;
    protected ExecutorService executorService;
    protected JmsEndpoint endpoint;
    protected volatile Destination replyTo;
    protected AbstractMessageListenerContainer listenerContainer;
    protected CorrelationTimeoutMap correlation;
    protected String correlationProperty;

    protected ReplyManagerSupport(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    @Override
    public void setScheduledExecutorService(ScheduledExecutorService executorService) {
        this.scheduledExecutorService = executorService;
    }

    @Override
    public void setOnTimeoutExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @Override
    public void setEndpoint(JmsEndpoint endpoint) {
        this.endpoint = endpoint;
    }

    @Override
    public void setReplyTo(Destination replyTo) {
        this.log.debug("ReplyTo destination: {}", (Object)replyTo);
        this.replyTo = replyTo;
    }

    @Override
    public void setCorrelationProperty(String correlationProperty) {
        this.correlationProperty = correlationProperty;
    }

    @Override
    public Destination getReplyTo() {
        if (this.replyTo != null) {
            return this.replyTo;
        }
        long interval = this.endpoint.getConfiguration().getWaitForTemporaryReplyToToBeUpdatedThreadSleepingTime();
        int max = this.endpoint.getConfiguration().getWaitForTemporaryReplyToToBeUpdatedCounter();
        this.log.trace("Waiting for replyTo destination to be ready (timeout: {} millis)", (Object)(interval * (long)max));
        ForegroundTask task = Tasks.foregroundTask().withBudget(Budgets.iterationBudget().withMaxIterations(max).withInterval(Duration.ofMillis(interval)).build()).build();
        boolean done = task.run(() -> {
            this.log.trace("Waiting for replyTo to be ready: {}", (Object)(this.replyTo != null ? 1 : 0));
            return this.replyTo != null;
        });
        if (!done) {
            this.log.warn("ReplyTo destination was not ready and timeout ({} millis) occurred", (Object)(interval * (long)max));
        }
        return this.replyTo;
    }

    @Override
    public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String originalCorrelationId, String correlationId, long requestTimeout) {
        QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback, originalCorrelationId, correlationId, requestTimeout);
        ReplyHandler result = this.correlation.putIfAbsent(correlationId, handler, requestTimeout);
        if (result != null) {
            String logMessage = String.format("The correlationId [%s] is not unique.", correlationId);
            throw new IllegalArgumentException(logMessage);
        }
        return correlationId;
    }

    protected abstract ReplyHandler createReplyHandler(ReplyManager var1, Exchange var2, AsyncCallback var3, String var4, String var5, long var6);

    public void onMessage(Message message, Session session) throws JMSException {
        String correlationID = null;
        try {
            correlationID = this.correlationProperty == null ? JmsMessageHelper.getJMSCorrelationID(message) : message.getStringProperty(this.correlationProperty);
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (correlationID == null) {
            this.log.warn("Ignoring message with no correlationID: {}", (Object)message);
            return;
        }
        this.log.debug("Received reply message with correlationID [{}] -> {}", (Object)correlationID, (Object)message);
        this.handleReplyMessage(correlationID, message, session);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processReply(ReplyHolder holder) {
        if (holder != null && this.isRunAllowed()) {
            try {
                Exchange exchange = holder.getExchange();
                Object to = exchange.getIn().getHeader("CamelJMSDestinationProduced");
                boolean timeout = holder.isTimeout();
                if (timeout) {
                    if (this.log.isWarnEnabled()) {
                        this.log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}. Setting ExchangeTimedOutException on {} and continue routing.", new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), this.replyTo, ExchangeHelper.logIds(exchange)});
                    }
                    String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + String.valueOf(this.replyTo);
                    exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
                } else {
                    Message message = holder.getMessage();
                    Session session = holder.getSession();
                    JmsMessage response = new JmsMessage(exchange, message, session, this.endpoint.getBinding());
                    exchange.setOut(response);
                    Object body = response.getBody();
                    if (to != null) {
                        response.setHeader("CamelJMSDestinationProduced", to);
                    }
                    if (this.endpoint.isTransferException() && body instanceof Exception) {
                        Exception exception = (Exception)body;
                        this.log.debug("Reply was an Exception. Setting the Exception on the Exchange: {}", body);
                        exchange.setException(exception);
                    } else {
                        this.log.debug("Reply received. OUT message body set to reply payload: {}", body);
                    }
                    if (holder.getOriginalCorrelationId() != null) {
                        JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId());
                        exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId());
                    }
                }
            }
            finally {
                AsyncCallback callback = holder.getCallback();
                callback.done(false);
            }
        }
    }

    protected abstract void handleReplyMessage(String var1, Message var2, Session var3);

    protected abstract AbstractMessageListenerContainer createListenerContainer() throws Exception;

    protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, Message message) {
        if (this.log.isWarnEnabled()) {
            this.log.warn("Early reply received with correlationID [{}] -> {}", (Object)correlationID, (Object)message);
        }
        long interval = this.endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedThreadSleepingTime();
        ForegroundTask task = Tasks.foregroundTask().withBudget(Budgets.iterationBudget().withMaxIterations(this.endpoint.getConfiguration().getWaitForProvisionCorrelationToBeUpdatedCounter()).withInterval(Duration.ofMillis(interval)).build()).build();
        return task.run(() -> this.getReplyHandler(correlationID), Objects::nonNull).orElse(null);
    }

    private ReplyHandler getReplyHandler(String correlationID) {
        this.log.trace("Early reply not found. Waiting a bit longer.");
        return (ReplyHandler)this.correlation.remove(correlationID);
    }

    @Override
    protected void doStart() throws Exception {
        ObjectHelper.notNull(this.executorService, "executorService", this);
        ObjectHelper.notNull(this.scheduledExecutorService, "scheduledExecutorService", this);
        ObjectHelper.notNull(this.endpoint, "endpoint", this);
        this.log.trace("Using timeout checker interval with {} millis", (Object)this.endpoint.getRequestTimeoutCheckerInterval());
        this.correlation = new CorrelationTimeoutMap(this.scheduledExecutorService, this.endpoint.getRequestTimeoutCheckerInterval(), this.executorService);
        ServiceHelper.startService(this.correlation);
        this.listenerContainer = this.createListenerContainer();
        this.listenerContainer.afterPropertiesSet();
        this.log.debug("Starting reply listener container on endpoint: {}", (Object)this.endpoint);
        this.endpoint.onListenerContainerStarting();
        this.listenerContainer.start();
    }

    @Override
    protected void doStop() throws Exception {
        ServiceHelper.stopService(this.correlation);
        if (this.listenerContainer != null) {
            this.log.debug("Stopping reply listener container on endpoint: {}", (Object)this.endpoint);
            try {
                this.listenerContainer.stop();
                this.listenerContainer.destroy();
            }
            finally {
                this.endpoint.onListenerContainerStopped();
                this.listenerContainer = null;
            }
        }
        if (this.scheduledExecutorService != null) {
            this.camelContext.getExecutorServiceManager().shutdownGraceful(this.scheduledExecutorService);
            this.scheduledExecutorService = null;
        }
        if (this.executorService != null) {
            this.camelContext.getExecutorServiceManager().shutdownGraceful(this.executorService);
            this.executorService = null;
        }
    }

    protected static void setupClientId(JmsEndpoint endpoint, DefaultMessageListenerContainer answer) {
        Object clientId = endpoint.getClientId();
        if (clientId != null) {
            clientId = (String)clientId + ".CamelReplyManager";
            answer.setClientId((String)clientId);
        }
    }

    protected static AbstractMessageListenerContainer getAbstractMessageListenerContainer(JmsEndpoint endpoint) {
        MessageListenerContainerFactory factory = endpoint.getConfiguration().getMessageListenerContainerFactory();
        if (factory != null) {
            return factory.createMessageListenerContainer(endpoint);
        }
        throw new IllegalArgumentException("ReplyToConsumerType.Custom requires that a MessageListenerContainerFactory has been configured");
    }
}

