/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.rocketmq.reply;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.Message;
import org.apache.camel.component.rocketmq.RocketMQEndpoint;
import org.apache.camel.component.rocketmq.RocketMQMessageConverter;
import org.apache.camel.component.rocketmq.reply.ReplyHandler;
import org.apache.camel.component.rocketmq.reply.ReplyHolder;
import org.apache.camel.component.rocketmq.reply.ReplyManager;
import org.apache.camel.component.rocketmq.reply.ReplyTimeoutMap;
import org.apache.camel.component.rocketmq.reply.RocketMQReplyHandler;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQReplyManagerSupport
extends ServiceSupport
implements ReplyManager {
    private static final int CLOSE_TIMEOUT = 30000;
    protected final Logger log = LoggerFactory.getLogger(RocketMQReplyManagerSupport.class);
    protected final CamelContext camelContext;
    protected final CountDownLatch replyToLatch = new CountDownLatch(1);
    protected ScheduledExecutorService executorService;
    protected RocketMQEndpoint endpoint;
    protected String replyToTopic;
    protected DefaultMQPushConsumer mqPushConsumer;
    protected ReplyTimeoutMap timeoutMap;

    public RocketMQReplyManagerSupport(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    @Override
    protected void doStart() throws Exception {
        ObjectHelper.notNull(this.executorService, "executorService", this);
        ObjectHelper.notNull(this.endpoint, "endpoint", this);
        this.log.debug("Using timeout checker interval with {} millis", (Object)this.endpoint.getRequestTimeoutCheckerIntervalMillis());
        this.timeoutMap = new ReplyTimeoutMap(this.executorService, this.endpoint.getRequestTimeoutCheckerIntervalMillis());
        ServiceHelper.startService((Object)this.timeoutMap);
        this.mqPushConsumer = this.createConsumer();
        this.mqPushConsumer.start();
        this.log.debug("Using executor {}", (Object)this.executorService);
    }

    protected DefaultMQPushConsumer createConsumer() throws MQClientException {
        this.setReplyToTopic(this.endpoint.getReplyToTopic());
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setConsumerGroup(this.endpoint.getReplyToConsumerGroup());
        consumer.setNamesrvAddr(this.endpoint.getNamesrvAddr());
        consumer.subscribe(this.replyToTopic, "*");
        consumer.registerMessageListener((msgs, context) -> {
            MessageExt messageExt = (MessageExt)msgs.get(0);
            this.onMessage(messageExt);
            this.log.trace("Consume message {}", (Object)messageExt);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        return consumer;
    }

    public void onMessage(MessageExt messageExt) {
        String messageKey = Arrays.stream(messageExt.getKeys().split(" ")).filter(s -> s.startsWith("camel-rocketmq-")).findFirst().orElse(null);
        if (messageKey == null) {
            this.log.warn("Ignoring message with no messageKey: {}", (Object)messageExt);
            return;
        }
        this.log.debug("Received reply message with messageKey [{}] -> {}", (Object)messageKey, (Object)messageExt);
        this.handleReplyMessage(messageKey, messageExt);
    }

    @Override
    protected void doStop() {
        ServiceHelper.stopService((Object)this.timeoutMap);
        if (this.mqPushConsumer != null) {
            this.log.debug("Closing connection: {} with timeout: {} ms.", (Object)this.mqPushConsumer, (Object)30000);
            this.mqPushConsumer.shutdown();
            this.mqPushConsumer = null;
        }
        if (this.executorService != null) {
            this.camelContext.getExecutorServiceManager().shutdownGraceful(this.executorService);
            this.executorService = null;
        }
    }

    @Override
    public void setEndpoint(RocketMQEndpoint endpoint) {
        this.endpoint = endpoint;
    }

    @Override
    public void setReplyToTopic(String replyToTopic) {
        this.log.debug("ReplyToTopic: {}", (Object)replyToTopic);
        this.replyToTopic = replyToTopic;
        this.replyToLatch.countDown();
    }

    @Override
    public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, String messageKey, long requestTimeout) {
        RocketMQReplyHandler handler = new RocketMQReplyHandler(replyManager, exchange, callback, messageKey, requestTimeout);
        ReplyHandler result = this.timeoutMap.putIfAbsent(messageKey, handler, requestTimeout);
        if (result != null) {
            String logMessage = String.format("The messageKey [%s] is not unique.", messageKey);
            throw new IllegalArgumentException(logMessage);
        }
        return messageKey;
    }

    @Override
    public void setScheduledExecutorService(ScheduledExecutorService executorService) {
        this.executorService = executorService;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void processReply(ReplyHolder holder) {
        if (!this.isRunAllowed()) {
            return;
        }
        try {
            Exchange exchange = holder.getExchange();
            if (holder.isTimeout()) {
                if (this.log.isWarnEnabled()) {
                    this.log.warn("Timeout occurred after {} millis waiting for reply message with messageKey [{}] on topic {}. Setting ExchangeTimedOutException on {} and continue routing.", new Object[]{holder.getTimeout(), holder.getMessageKey(), this.replyToTopic, ExchangeHelper.logIds(exchange)});
                }
                String msg = "reply message with messageKey: " + holder.getMessageKey() + " not received on topic: " + this.replyToTopic;
                exchange.setException(new ExchangeTimedOutException(exchange, holder.getTimeout(), msg));
            } else {
                RocketMQReplyManagerSupport.processReceivedReply(holder);
            }
        }
        finally {
            holder.getCallback().done(false);
        }
    }

    private static void processReceivedReply(ReplyHolder holder) {
        Message message = holder.getExchange().getOut();
        MessageExt messageExt = holder.getMessageExt();
        message.setBody(messageExt.getBody());
        RocketMQMessageConverter.populateHeadersByMessageExt(message, messageExt);
    }

    @Override
    public void cancelMessageKey(String messageKey) {
        if (null == this.timeoutMap.get(messageKey)) {
            return;
        }
        this.log.warn("Cancelling messageKey: {}", (Object)messageKey);
        this.timeoutMap.remove(messageKey);
    }

    protected void handleReplyMessage(String messageKey, MessageExt messageExt) {
        ReplyHandler handler = (ReplyHandler)this.timeoutMap.get(messageKey);
        if (handler != null) {
            this.timeoutMap.remove(messageKey);
            handler.onReply(messageKey, messageExt);
        } else {
            this.log.warn("Reply received for unknown messageKey [{}]. The message will be ignored: {}", (Object)messageKey, (Object)messageExt);
        }
    }
}

