/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.mq.producer;

import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.jms.Destination;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.darkphoenixs.activemq.producer.AbstractProducer;
import org.darkphoenixs.kafka.codec.KafkaMessageEncoder;
import org.darkphoenixs.kafka.core.KafkaDestination;
import org.darkphoenixs.kafka.core.KafkaMessageTemplate;
import org.darkphoenixs.kafka.pool.MessageSenderPool;
import org.darkphoenixs.mq.codec.MQMessageEncoder;
import org.darkphoenixs.mq.exception.MQException;
import org.darkphoenixs.mq.producer.MQProducer;
import org.darkphoenixs.mq.util.MQ_TYPE;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;

public abstract class MQProducerAdapter<T>
implements MQProducer<T> {
    protected Logger logger = LoggerFactory.getLogger(this.getClass());
    private JmsTemplate activemqTemplate;
    private Destination activemqDestination;
    private MessageSenderPool<byte[], byte[]> kafkaMessageSenderPool;
    private DefaultMQProducer rocketmqDefaultProducer;
    private TransactionMQProducer rocketmqTransactionProducer;
    private MQMessageEncoder<T> messageEncoder;
    private String topic;
    private String producerKey;
    private MQ_TYPE type;
    private final ConcurrentMap<MQ_TYPE, MQProducer<T>> producerConcurrentMap = new ConcurrentHashMap<MQ_TYPE, MQProducer<T>>();

    public JmsTemplate getActivemqTemplate() {
        return this.activemqTemplate;
    }

    public void setActivemqTemplate(JmsTemplate activemqTemplate) {
        this.activemqTemplate = activemqTemplate;
    }

    public Destination getActivemqDestination() {
        return this.activemqDestination;
    }

    public void setActivemqDestination(Destination activemqDestination) {
        this.activemqDestination = activemqDestination;
    }

    public MessageSenderPool<byte[], byte[]> getKafkaMessageSenderPool() {
        return this.kafkaMessageSenderPool;
    }

    public void setKafkaMessageSenderPool(MessageSenderPool<byte[], byte[]> kafkaMessageSenderPool) {
        this.kafkaMessageSenderPool = kafkaMessageSenderPool;
    }

    public DefaultMQProducer getRocketmqDefaultProducer() {
        return this.rocketmqDefaultProducer;
    }

    public void setRocketmqDefaultProducer(DefaultMQProducer rocketmqDefaultProducer) {
        this.rocketmqDefaultProducer = rocketmqDefaultProducer;
    }

    public TransactionMQProducer getRocketmqTransactionProducer() {
        return this.rocketmqTransactionProducer;
    }

    public void setRocketmqTransactionProducer(TransactionMQProducer rocketmqTransactionProducer) {
        this.rocketmqTransactionProducer = rocketmqTransactionProducer;
    }

    public MQMessageEncoder<T> getMessageEncoder() {
        return this.messageEncoder;
    }

    public void setMessageEncoder(MQMessageEncoder<T> messageEncoder) {
        this.messageEncoder = messageEncoder;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public String getType() {
        if (this.type != null) {
            return this.type.name();
        }
        return null;
    }

    public void setType(String type) throws MQException {
        this.type = MQ_TYPE.valueOf(type);
        this.initProducer();
    }

    @Override
    public String getProducerKey() {
        if (this.producerKey != null) {
            return this.producerKey;
        }
        return this.topic;
    }

    public void setProducerKey(String producerKey) {
        this.producerKey = producerKey;
    }

    @Override
    public void send(T message) throws MQException {
        MQProducer<T> mqProducer = this.getProducerInstance();
        if (mqProducer == null) {
            throw new MQException("No matching MQProducer, Please check MQ_TYPE !");
        }
        mqProducer.send(message);
    }

    public void sendWithKey(String key, T message) throws MQException {
        MQProducer<T> mqProducer = this.getProducerInstance();
        if (mqProducer instanceof org.darkphoenixs.kafka.producer.AbstractProducer) {
            ((org.darkphoenixs.kafka.producer.AbstractProducer)mqProducer).sendWithKey(key, message);
        } else if (mqProducer instanceof org.darkphoenixs.rocketmq.producer.AbstractProducer) {
            ((org.darkphoenixs.rocketmq.producer.AbstractProducer)mqProducer).sendWithKey(key, message);
        } else if (mqProducer instanceof AbstractProducer) {
            mqProducer.send(message);
        } else {
            throw new MQException("No matching MQProducer, Please check MQ_TYPE !");
        }
    }

    public void batchSend(List<T> messages) throws MQException {
        MQProducer<T> mqProducer = this.getProducerInstance();
        if (mqProducer instanceof org.darkphoenixs.kafka.producer.AbstractProducer) {
            for (T message : messages) {
                mqProducer.send(message);
            }
        } else if (mqProducer instanceof org.darkphoenixs.rocketmq.producer.AbstractProducer) {
            ((org.darkphoenixs.rocketmq.producer.AbstractProducer)mqProducer).batchSend(messages);
        } else if (mqProducer instanceof AbstractProducer) {
            for (T message : messages) {
                mqProducer.send(message);
            }
        } else {
            throw new MQException("No matching MQProducer, Please check MQ_TYPE !");
        }
    }

    private void initProducer() throws MQException {
        switch (this.type) {
            case KAFKA: {
                if (this.topic == null || this.messageEncoder == null || this.kafkaMessageSenderPool == null) {
                    throw new MQException("Topic & MessageEncoder & KafkaMessageSenderPool must not null!");
                }
                org.darkphoenixs.kafka.producer.AbstractProducer kafkaAbstractProducer = new org.darkphoenixs.kafka.producer.AbstractProducer<String, T>(){

                    @Override
                    protected T doSend(T message) throws MQException {
                        return MQProducerAdapter.this.doSend(message);
                    }
                };
                KafkaMessageTemplate kafkaMessageTemplate = new KafkaMessageTemplate();
                kafkaMessageTemplate.setMessageSenderPool(this.kafkaMessageSenderPool);
                kafkaMessageTemplate.setEncoder(new KafkaMessageEncoder<String, T>(){

                    @Override
                    public byte[] encodeKey(String key) throws MQException {
                        if (key != null) {
                            return key.getBytes();
                        }
                        return null;
                    }

                    @Override
                    public byte[] encodeVal(T val) throws MQException {
                        return MQProducerAdapter.this.messageEncoder.encode(val);
                    }

                    @Override
                    public List<byte[]> batchEncode(List<T> message) throws MQException {
                        return MQProducerAdapter.this.messageEncoder.batchEncode(message);
                    }

                    @Override
                    public Map<byte[], byte[]> batchEncode(Map<String, T> messages) throws MQException {
                        IdentityHashMap<byte[], byte[]> map = new IdentityHashMap<byte[], byte[]>();
                        if (messages != null) {
                            for (Map.Entry entry : messages.entrySet()) {
                                map.put(this.encodeKey(entry.getKey()), this.encodeVal((T)entry.getValue()));
                            }
                        }
                        return map;
                    }
                });
                KafkaDestination kafkaDestination = new KafkaDestination(this.topic);
                kafkaAbstractProducer.setMessageTemplate(kafkaMessageTemplate);
                kafkaAbstractProducer.setDestination(kafkaDestination);
                kafkaAbstractProducer.setProducerKey(this.producerKey);
                this.producerConcurrentMap.put(this.type, kafkaAbstractProducer);
                break;
            }
            case ROCKETMQ: {
                if (this.topic == null || this.messageEncoder == null || this.rocketmqDefaultProducer == null && this.rocketmqTransactionProducer == null) {
                    throw new MQException("Topic & MessageEncoder & (RocketmqDefaultProducer | RocketmqTransactionProducer) must not null!");
                }
                org.darkphoenixs.rocketmq.producer.AbstractProducer rocketmqAbstractProducer = new org.darkphoenixs.rocketmq.producer.AbstractProducer<T>(){

                    @Override
                    protected T doSend(T message) throws MQException {
                        return MQProducerAdapter.this.doSend(message);
                    }

                    @Override
                    protected List<T> doSend(List<T> messages) throws MQException {
                        return MQProducerAdapter.this.doSend(messages);
                    }
                };
                rocketmqAbstractProducer.setTopic(this.topic);
                rocketmqAbstractProducer.setMessageEncoder(this.messageEncoder);
                rocketmqAbstractProducer.setDefaultMQProducer(this.rocketmqDefaultProducer);
                rocketmqAbstractProducer.setTransactionMQProducer(this.rocketmqTransactionProducer);
                rocketmqAbstractProducer.setProducerKey(this.producerKey);
                this.producerConcurrentMap.put(this.type, rocketmqAbstractProducer);
                break;
            }
            case ACTIVEMQ: {
                if (this.activemqDestination == null || this.activemqTemplate == null) {
                    throw new MQException("ActivemqDestination & ActivemqTemplate must not null!");
                }
                AbstractProducer activemqAbstractProducer = new AbstractProducer<T>(){

                    @Override
                    protected Object doSend(T message) throws MQException {
                        return MQProducerAdapter.this.doSend(message);
                    }
                };
                activemqAbstractProducer.setDestination(this.activemqDestination);
                activemqAbstractProducer.setJmsTemplate(this.activemqTemplate);
                activemqAbstractProducer.setProducerKey(this.producerKey);
                this.producerConcurrentMap.put(this.type, activemqAbstractProducer);
                break;
            }
            default: {
                throw new MQException("MQ type non-exist default!");
            }
        }
    }

    protected abstract T doSend(T var1) throws MQException;

    protected List<T> doSend(List<T> messages) throws MQException {
        return messages;
    }

    public MQProducer<T> getProducerInstance() {
        return (MQProducer)this.producerConcurrentMap.get((Object)this.type);
    }
}

