/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.mq.listener;

import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.darkphoenixs.kafka.codec.KafkaMessageDecoder;
import org.darkphoenixs.kafka.core.KafkaMessageAdapter;
import org.darkphoenixs.kafka.listener.KafkaMessageConsumerListener;
import org.darkphoenixs.mq.codec.MQMessageDecoder;
import org.darkphoenixs.mq.consumer.MQConsumerAdapter;
import org.darkphoenixs.mq.exception.MQException;
import org.darkphoenixs.mq.listener.MQMessageListener;
import org.darkphoenixs.mq.util.MQ_BATCH;
import org.darkphoenixs.mq.util.MQ_MODEL;
import org.darkphoenixs.mq.util.MQ_TYPE;
import org.darkphoenixs.rocketmq.listener.RocketmqMessageConsumerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQMessageListenerAdapter<T>
implements MQMessageListener<T> {
    protected Logger logger = LoggerFactory.getLogger(MQMessageListenerAdapter.class);
    protected KafkaMessageAdapter<String, T> kafkaMessageAdapter;
    protected MessageListener rocketMessageListener;
    private MQMessageDecoder<T> messageDecoder;
    private MQConsumerAdapter<T> consumerAdapter;
    private MQ_TYPE type;
    private MQ_BATCH batch = MQ_BATCH.NON_BATCH;
    private MQ_MODEL model = MQ_MODEL.MODEL_1;

    public MQMessageDecoder<T> getMessageDecoder() {
        return this.messageDecoder;
    }

    public void setMessageDecoder(MQMessageDecoder<T> messageDecoder) {
        this.messageDecoder = messageDecoder;
    }

    public MQConsumerAdapter<T> getConsumerAdapter() {
        return this.consumerAdapter;
    }

    public void setConsumerAdapter(MQConsumerAdapter<T> consumerAdapter) {
        this.consumerAdapter = consumerAdapter;
    }

    public String getType() {
        if (this.type != null) {
            return this.type.name();
        }
        return null;
    }

    public void setType(String type) throws MQException {
        this.type = MQ_TYPE.valueOf(type);
        this.initListener();
    }

    public String getBatch() {
        return this.batch.name();
    }

    public void setBatch(String batch) {
        this.batch = MQ_BATCH.valueOf(batch);
    }

    public String getModel() {
        return this.model.name();
    }

    public void setModel(String model) {
        this.model = MQ_MODEL.valueOf(model);
    }

    @Override
    public void onMessage(T message) throws MQException {
        if (this.consumerAdapter == null) {
            throw new MQException("MQConsumerAdapter is null !");
        }
        this.consumerAdapter.receive(message);
        this.logger.debug("Consume Success, Message : " + message);
    }

    public void onMessageWithKey(String key, T message) throws MQException {
        if (this.consumerAdapter == null) {
            throw new MQException("MQConsumerAdapter is null !");
        }
        this.consumerAdapter.receive(key, message);
        this.logger.debug("Consume Success, Key : " + key + " Message : " + message);
    }

    public void onMessageWithBatch(Map<String, T> messages) throws MQException {
        if (this.consumerAdapter == null) {
            throw new MQException("MQConsumerAdapter is null !");
        }
        this.consumerAdapter.receive(messages);
        this.logger.debug("Consume Success, Message size: " + messages.size());
    }

    public KafkaMessageAdapter<String, T> getKafkaMessageAdapter() {
        return this.kafkaMessageAdapter;
    }

    public MessageListener getRocketMessageListener() {
        return this.rocketMessageListener;
    }

    private void initListener() throws MQException {
        switch (this.type) {
            case KAFKA: {
                if (this.messageDecoder == null) {
                    throw new MQException("MessageDecoder must not null!");
                }
                this.kafkaMessageAdapter = new KafkaMessageAdapter();
                this.kafkaMessageAdapter.setBatch(this.getBatch());
                this.kafkaMessageAdapter.setModel(this.getModel());
                this.kafkaMessageAdapter.setDecoder(new KafkaMessageDecoder<String, T>(){

                    @Override
                    public String decodeKey(byte[] bytes) throws MQException {
                        if (bytes != null) {
                            return new String(bytes);
                        }
                        return null;
                    }

                    @Override
                    public T decodeVal(byte[] bytes) throws MQException {
                        return MQMessageListenerAdapter.this.messageDecoder.decode(bytes);
                    }

                    @Override
                    public List<T> batchDecode(List<byte[]> bytes) throws MQException {
                        return MQMessageListenerAdapter.this.messageDecoder.batchDecode(bytes);
                    }

                    @Override
                    public Map<String, T> batchDecode(Map<byte[], byte[]> bytes) throws MQException {
                        IdentityHashMap map = new IdentityHashMap();
                        if (bytes != null) {
                            for (Map.Entry<byte[], byte[]> entry : bytes.entrySet()) {
                                map.put(this.decodeKey(entry.getKey()), this.decodeVal(entry.getValue()));
                            }
                        }
                        return map;
                    }
                });
                this.kafkaMessageAdapter.setMessageListener(new KafkaMessageConsumerListener<String, T>(){

                    @Override
                    public void onMessage(String key, T val) throws MQException {
                        MQMessageListenerAdapter.this.onMessageWithKey(key, val);
                    }

                    @Override
                    public void onMessage(Map<String, T> messages) throws MQException {
                        MQMessageListenerAdapter.this.onMessageWithBatch(messages);
                    }
                });
                break;
            }
            case ROCKETMQ: {
                if (this.messageDecoder == null) {
                    throw new MQException("MessageDecoder must not null!");
                }
                RocketmqMessageConsumerListener rocketmqMessageConsumerListener = new RocketmqMessageConsumerListener<T>(){

                    @Override
                    public void onMessage(String key, T val) throws MQException {
                        MQMessageListenerAdapter.this.onMessageWithKey(key, val);
                    }

                    @Override
                    public void onMessage(Map<String, T> messages) throws MQException {
                        MQMessageListenerAdapter.this.onMessageWithBatch(messages);
                    }
                };
                rocketmqMessageConsumerListener.setBatch(this.getBatch());
                rocketmqMessageConsumerListener.setModel(this.getModel());
                rocketmqMessageConsumerListener.setMessageDecoder(this.getMessageDecoder());
                this.rocketMessageListener = rocketmqMessageConsumerListener.getMessageListener();
                break;
            }
            case ACTIVEMQ: {
                break;
            }
            default: {
                throw new MQException("MQ type non-exist default!");
            }
        }
    }
}

