package org.apache.camel.component.rocketmq.reply;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.Message;
import org.apache.camel.component.rocketmq.RocketMQEndpoint;
import org.apache.camel.component.rocketmq.RocketMQMessageConverter;
import org.apache.camel.component.rocketmq.RocketMQProducer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/rocketmq/reply/RocketMQReplyManagerSupport.class */
public class RocketMQReplyManagerSupport extends ServiceSupport implements ReplyManager {
    private static final int CLOSE_TIMEOUT = 30000;
    protected final CamelContext camelContext;
    protected ScheduledExecutorService executorService;
    protected RocketMQEndpoint endpoint;
    protected String replyToTopic;
    protected DefaultMQPushConsumer mqPushConsumer;
    protected ReplyTimeoutMap timeoutMap;
    protected final Logger log = LoggerFactory.getLogger(RocketMQReplyManagerSupport.class);
    protected final CountDownLatch replyToLatch = new CountDownLatch(1);

    public RocketMQReplyManagerSupport(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        ObjectHelper.notNull(this.executorService, "executorService", this);
        ObjectHelper.notNull(this.endpoint, "endpoint", this);
        this.log.debug("Using timeout checker interval with {} millis", Long.valueOf(this.endpoint.getRequestTimeoutCheckerIntervalMillis()));
        this.timeoutMap = new ReplyTimeoutMap(this.executorService, this.endpoint.getRequestTimeoutCheckerIntervalMillis());
        ServiceHelper.startService(this.timeoutMap);
        this.mqPushConsumer = createConsumer();
        this.mqPushConsumer.start();
        this.log.debug("Using executor {}", this.executorService);
    }

    protected DefaultMQPushConsumer createConsumer() throws MQClientException {
        setReplyToTopic(this.endpoint.getReplyToTopic());
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer();
        defaultMQPushConsumer.setConsumerGroup(this.endpoint.getReplyToConsumerGroup());
        defaultMQPushConsumer.setNamesrvAddr(this.endpoint.getNamesrvAddr());
        defaultMQPushConsumer.subscribe(this.replyToTopic, "*");
        defaultMQPushConsumer.registerMessageListener((list, consumeConcurrentlyContext) -> {
            MessageExt messageExt = (MessageExt) list.get(0);
            onMessage(messageExt);
            this.log.trace("Consume message {}", messageExt);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        return defaultMQPushConsumer;
    }

    public void onMessage(MessageExt messageExt) {
        String str = (String) Arrays.stream(messageExt.getKeys().split(" ")).filter(str2 -> {
            return str2.startsWith(RocketMQProducer.GENERATE_MESSAGE_KEY_PREFIX);
        }).findFirst().orElse(null);
        if (str == null) {
            this.log.warn("Ignoring message with no messageKey: {}", messageExt);
        } else {
            this.log.debug("Received reply message with messageKey [{}] -> {}", str, messageExt);
            handleReplyMessage(str, messageExt);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStop() {
        ServiceHelper.stopService(this.timeoutMap);
        if (this.mqPushConsumer != null) {
            this.log.debug("Closing connection: {} with timeout: {} ms.", this.mqPushConsumer, 30000);
            this.mqPushConsumer.shutdown();
            this.mqPushConsumer = null;
        }
        if (this.executorService != null) {
            this.camelContext.getExecutorServiceManager().shutdownGraceful(this.executorService);
            this.executorService = null;
        }
    }

    @Override // org.apache.camel.component.rocketmq.reply.ReplyManager
    public void setEndpoint(RocketMQEndpoint rocketMQEndpoint) {
        this.endpoint = rocketMQEndpoint;
    }

    @Override // org.apache.camel.component.rocketmq.reply.ReplyManager
    public void setReplyToTopic(String str) {
        this.log.debug("ReplyToTopic: {}", str);
        this.replyToTopic = str;
        this.replyToLatch.countDown();
    }

    @Override // org.apache.camel.component.rocketmq.reply.ReplyManager
    public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback asyncCallback, String str, long j) {
        if (this.timeoutMap.putIfAbsent(str, (ReplyHandler) new RocketMQReplyHandler(replyManager, exchange, asyncCallback, str, j), j) != null) {
            throw new IllegalArgumentException(String.format("The messageKey [%s] is not unique.", str));
        }
        return str;
    }

    @Override // org.apache.camel.component.rocketmq.reply.ReplyManager
    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
        this.executorService = scheduledExecutorService;
    }

    @Override // org.apache.camel.component.rocketmq.reply.ReplyManager
    public void processReply(ReplyHolder replyHolder) {
        if (isRunAllowed()) {
            try {
                Exchange exchange = replyHolder.getExchange();
                if (replyHolder.isTimeout()) {
                    if (this.log.isWarnEnabled()) {
                        this.log.warn("Timeout occurred after {} millis waiting for reply message with messageKey [{}] on topic {}. Setting ExchangeTimedOutException on {} and continue routing.", new Object[]{Long.valueOf(replyHolder.getTimeout()), replyHolder.getMessageKey(), this.replyToTopic, ExchangeHelper.logIds(exchange)});
                    }
                    exchange.setException(new ExchangeTimedOutException(exchange, replyHolder.getTimeout(), "reply message with messageKey: " + replyHolder.getMessageKey() + " not received on topic: " + this.replyToTopic));
                } else {
                    processReceivedReply(replyHolder);
                }
            } finally {
                replyHolder.getCallback().done(false);
            }
        }
    }

    private static void processReceivedReply(ReplyHolder replyHolder) {
        Message out = replyHolder.getExchange().getOut();
        MessageExt messageExt = replyHolder.getMessageExt();
        out.setBody(messageExt.getBody());
        RocketMQMessageConverter.populateHeadersByMessageExt(out, messageExt);
    }

    @Override // org.apache.camel.component.rocketmq.reply.ReplyManager
    public void cancelMessageKey(String str) {
        if (null == this.timeoutMap.get(str)) {
            return;
        }
        this.log.warn("Cancelling messageKey: {}", str);
        this.timeoutMap.remove(str);
    }

    protected void handleReplyMessage(String str, MessageExt messageExt) {
        ReplyHandler replyHandler = this.timeoutMap.get(str);
        if (replyHandler == null) {
            this.log.warn("Reply received for unknown messageKey [{}]. The message will be ignored: {}", str, messageExt);
        } else {
            this.timeoutMap.remove(str);
            replyHandler.onReply(str, messageExt);
        }
    }
}
