/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.kafka.pool;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.common.OffsetAndMetadata;
import kafka.common.TopicAndPartition;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.serializer.DefaultDecoder;
import kafka.utils.VerifiableProperties;
import org.darkphoenixs.kafka.core.KafkaDestination;
import org.darkphoenixs.kafka.core.KafkaMessageAdapter;
import org.darkphoenixs.kafka.core.KafkaMessageReceiver;
import org.darkphoenixs.kafka.core.KafkaMessageReceiverImpl;
import org.darkphoenixs.kafka.pool.KafkaMessageReceiverRetry;
import org.darkphoenixs.kafka.pool.KafkaPoolThreadFactory;
import org.darkphoenixs.kafka.pool.MessageReceiverPool;
import org.darkphoenixs.mq.exception.MQException;
import org.darkphoenixs.mq.util.RefleTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;

public class KafkaMessageReceiverPool<K, V>
implements MessageReceiverPool<K, V> {
    private static final String tagger = "KafkaMessageReceiverPool";
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageReceiverPool.class);
    protected ConsumerConnector consumer;
    protected ExecutorService pool;
    protected Properties props = new Properties();
    protected AtomicBoolean running = new AtomicBoolean(false);
    private KafkaMessageAdapter<?, ?> messageAdapter;
    private KafkaDestination destination;
    private int poolSize;
    private Resource config;
    private int retryCount = 3;
    private KafkaMessageReceiverRetry<MessageAndMetadata<K, V>> receiverRetry;
    private Class<?> keyDecoderClass = DefaultDecoder.class;
    private Class<?> valDecoderClass = DefaultDecoder.class;
    private ThreadFactory threadFactory;

    public ThreadFactory getThreadFactory() {
        return this.threadFactory;
    }

    public void setThreadFactory(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    public String getClientId() {
        return this.props.getProperty("client.id");
    }

    public void setClientId(String clientId) {
        this.props.setProperty("client.id", clientId);
    }

    public KafkaDestination getDestination() {
        return this.destination;
    }

    public void setDestination(KafkaDestination destination) {
        this.destination = destination;
    }

    public String getZookeeperStr() {
        return this.props.getProperty("zookeeper.connect");
    }

    public void setZookeeperStr(String zookeeperStr) {
        this.props.setProperty("zookeeper.connect", zookeeperStr);
    }

    public Boolean getAutoCommit() {
        return Boolean.valueOf(this.props.getProperty("auto.commit.enable", "true"));
    }

    public void setAutoCommit(boolean autoCommit) {
        this.props.setProperty("auto.commit.enable", String.valueOf(autoCommit));
    }

    public int getRetryCount() {
        return this.retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }

    public Properties getProps() {
        return this.props;
    }

    public void setProps(Properties props) {
        this.props = props;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public Resource getConfig() {
        return this.config;
    }

    public void setConfig(Resource config) {
        this.config = config;
        try {
            PropertiesLoaderUtils.fillProperties((Properties)this.props, (Resource)this.config);
        }
        catch (IOException e) {
            logger.error("Fill properties failed.", (Throwable)e);
        }
    }

    public Class<?> getKeyDecoderClass() {
        return this.keyDecoderClass;
    }

    public void setKeyDecoderClass(Class<?> keyDecoderClass) {
        this.keyDecoderClass = keyDecoderClass;
    }

    public Class<?> getValDecoderClass() {
        return this.valDecoderClass;
    }

    public void setValDecoderClass(Class<?> valDecoderClass) {
        this.valDecoderClass = valDecoderClass;
    }

    public KafkaMessageAdapter<?, ?> getMessageAdapter() {
        return this.messageAdapter;
    }

    public void setMessageAdapter(KafkaMessageAdapter<?, ?> messageAdapter) {
        this.messageAdapter = messageAdapter;
        if (messageAdapter.getDestination() != null) {
            this.setDestination(messageAdapter.getDestination());
        }
    }

    @Override
    public KafkaMessageReceiver<K, V> getReceiver() {
        KafkaMessageReceiverImpl receiver = new KafkaMessageReceiverImpl(this.props, this);
        return receiver;
    }

    @Override
    public void returnReceiver(KafkaMessageReceiver<K, V> receiver) {
        if (receiver != null) {
            receiver.shutDown();
        }
    }

    @Override
    public synchronized void init() {
        String topic = this.destination.getDestinationName();
        int defaultSize = this.getReceiver().getPartitionCount(topic);
        if (this.poolSize == 0 || this.poolSize > defaultSize) {
            this.setPoolSize(defaultSize);
        }
        if (this.retryCount > 0) {
            this.receiverRetry = new KafkaMessageReceiverRetry(topic, this.retryCount, this.messageAdapter);
        }
        this.threadFactory = new KafkaPoolThreadFactory("KafkaMessageReceiverPool-" + topic);
        this.pool = Executors.newFixedThreadPool(this.poolSize, this.threadFactory);
        logger.info("Message receiver pool initializing. poolSize : " + this.poolSize + " config : " + this.props.toString());
        this.consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)new ConsumerConfig(this.props));
        HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, this.poolSize);
        VerifiableProperties verProps = new VerifiableProperties(this.props);
        Decoder keyDecoder = (Decoder)RefleTool.newInstance(this.keyDecoderClass, verProps);
        Decoder valDecoder = (Decoder)RefleTool.newInstance(this.valDecoderClass, verProps);
        Map consumerMap = this.consumer.createMessageStreams(topicCountMap, keyDecoder, valDecoder);
        List streams = (List)consumerMap.get(topic);
        for (KafkaStream stream : streams) {
            this.pool.submit(new ReceiverThread(stream, this.messageAdapter));
        }
        logger.info("Message receiver pool initialized.");
        this.running.set(true);
    }

    @Override
    public synchronized void destroy() {
        logger.info("Message receiver pool closing.");
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
        if (this.pool != null) {
            this.pool.shutdown();
            try {
                if (!this.pool.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                    logger.warn("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                }
            }
            catch (InterruptedException e) {
                logger.error("Interrupted during shutdown, exiting uncleanly");
            }
        }
        if (this.receiverRetry != null) {
            this.receiverRetry.destroy();
        }
        logger.info("Message receiver pool closed.");
        this.running.set(false);
    }

    @Override
    public synchronized boolean isRunning() {
        return this.running.get();
    }

    class ReceiverThread
    implements Runnable {
        private KafkaStream<K, V> stream;
        private KafkaMessageAdapter<?, ?> adapter;

        public ReceiverThread(KafkaStream<K, V> stream, KafkaMessageAdapter<?, ?> adapter) {
            this.stream = stream;
            this.adapter = adapter;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            logger.info(Thread.currentThread().getName() + " clientId: " + this.stream.clientId() + " start.");
            for (MessageAndMetadata messageAndMetadata : this.stream) {
                try {
                    this.adapter.messageAdapter(messageAndMetadata);
                }
                catch (MQException e) {
                    if (KafkaMessageReceiverPool.this.receiverRetry != null) {
                        KafkaMessageReceiverPool.this.receiverRetry.receiveMessageRetry(messageAndMetadata);
                    }
                    logger.error("Receive message failed. topic: " + messageAndMetadata.topic() + " offset: " + messageAndMetadata.offset() + " partition: " + messageAndMetadata.partition(), (Throwable)e);
                }
                finally {
                    if (KafkaMessageReceiverPool.this.getAutoCommit().booleanValue()) continue;
                    KafkaMessageReceiverPool.this.consumer.commitOffsets(Collections.singletonMap(TopicAndPartition.apply((String)messageAndMetadata.topic(), (int)messageAndMetadata.partition()), OffsetAndMetadata.apply((long)(messageAndMetadata.offset() + 1L))), true);
                }
            }
            logger.info(Thread.currentThread().getName() + " clientId: " + this.stream.clientId() + " end.");
        }
    }
}

