/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.kafka.pool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.darkphoenixs.kafka.core.KafkaDestination;
import org.darkphoenixs.kafka.core.KafkaMessageAdapter;
import org.darkphoenixs.kafka.core.KafkaMessageNewReceiver;
import org.darkphoenixs.kafka.core.KafkaMessageReceiver;
import org.darkphoenixs.kafka.pool.KafkaMessageReceiverMonitor;
import org.darkphoenixs.kafka.pool.KafkaMessageReceiverRetry;
import org.darkphoenixs.kafka.pool.KafkaPoolThreadFactory;
import org.darkphoenixs.kafka.pool.MessageReceiverPool;
import org.darkphoenixs.mq.exception.MQException;
import org.darkphoenixs.mq.util.MQ_BATCH;
import org.darkphoenixs.mq.util.MQ_MODEL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;

public class KafkaMessageNewReceiverPool<K, V>
implements MessageReceiverPool<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageNewReceiverPool.class);
    protected BlockingQueue<ConsumerRecords<K, V>> blockingQueue;
    protected ExecutorService receivPool;
    protected ExecutorService handlePool;
    protected List<ReceiverThread> receivThreads = new ArrayList<ReceiverThread>();
    protected List<HandlerThread> handleThreads = new ArrayList<HandlerThread>();
    protected AtomicBoolean running = new AtomicBoolean(false);
    private MQ_MODEL model = MQ_MODEL.MODEL_1;
    private MQ_BATCH batch = MQ_BATCH.NON_BATCH;
    private COMMIT commit = COMMIT.AUTO_COMMIT;
    private Properties props = new Properties();
    private Resource config;
    private int poolSize;
    private int handleMultiple = 2;
    private int retryCount = 3;
    private int queueSize = 100000;
    private long threadSleep = 0L;
    private long pollTimeout = 2000L;
    private long monitorIntervalTime = 30000L;
    private int monitorPercentage = 50;
    private KafkaMessageAdapter<?, ?> messageAdapter;
    private KafkaDestination destination;
    private KafkaMessageReceiverRetry<ConsumerRecord<K, V>> receiverRetry;
    private KafkaMessageReceiverMonitor<ConsumerRecords<K, V>> receiverMonitor;
    protected OffsetCommitCallback offsetCommitCallback = new OffsetCommitCallback(){

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                logger.error("Offset commit with offsets {} failed", offsets, (Object)exception);
            }
        }
    };

    public Properties getProps() {
        return this.props;
    }

    public void setProps(Properties props) {
        this.props = props;
    }

    public int getHandleMultiple() {
        return this.handleMultiple;
    }

    public void setHandleMultiple(int handleMultiple) {
        this.handleMultiple = handleMultiple;
    }

    public int getRetryCount() {
        return this.retryCount;
    }

    public void setRetryCount(int retryCount) {
        this.retryCount = retryCount;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    public void setQueueSize(int queueSize) {
        this.queueSize = queueSize;
    }

    public long getThreadSleep() {
        return this.threadSleep;
    }

    public void setThreadSleep(long threadSleep) {
        this.threadSleep = threadSleep;
    }

    public long getPollTimeout() {
        return this.pollTimeout;
    }

    public void setPollTimeout(long pollTimeout) {
        this.pollTimeout = pollTimeout;
    }

    public long getMonitorIntervalTime() {
        return this.monitorIntervalTime;
    }

    public void setMonitorIntervalTime(long monitorIntervalTime) {
        this.monitorIntervalTime = monitorIntervalTime;
    }

    public int getMonitorPercentage() {
        return this.monitorPercentage;
    }

    public void setMonitorPercentage(int monitorPercentage) {
        this.monitorPercentage = monitorPercentage;
    }

    public Resource getConfig() {
        return this.config;
    }

    public void setConfig(Resource config) {
        this.config = config;
        try {
            PropertiesLoaderUtils.fillProperties((Properties)this.props, (Resource)this.config);
        }
        catch (IOException e) {
            logger.error("Fill properties failed.", (Throwable)e);
        }
    }

    public String getModel() {
        return this.model.name();
    }

    @Deprecated
    public void setModel(String model) {
        this.model = MQ_MODEL.valueOf(model);
    }

    public String getBatch() {
        return this.batch.name();
    }

    @Deprecated
    public void setBatch(String batch) {
        this.batch = MQ_BATCH.valueOf(batch);
    }

    public String getCommit() {
        return this.commit.name();
    }

    public void setCommit(String commit) {
        this.commit = COMMIT.valueOf(commit);
        if (!this.commit.equals((Object)COMMIT.AUTO_COMMIT)) {
            this.props.setProperty("enable.auto.commit", "false");
        }
    }

    public KafkaMessageAdapter<?, ?> getMessageAdapter() {
        return this.messageAdapter;
    }

    public void setMessageAdapter(KafkaMessageAdapter<?, ?> messageAdapter) {
        this.messageAdapter = messageAdapter;
        if (messageAdapter.getModel() != null) {
            this.setModel(messageAdapter.getModel());
        }
        if (messageAdapter.getBatch() != null) {
            this.setBatch(messageAdapter.getBatch());
        }
        if (messageAdapter.getDestination() != null) {
            this.setDestination(messageAdapter.getDestination());
        }
    }

    public String getClientId() {
        return this.props.getProperty("client.id", "client_new_consumer");
    }

    public String getGroupId() {
        return this.props.getProperty("group.id", "group_new_consumer");
    }

    public KafkaDestination getDestination() {
        return this.destination;
    }

    public void setDestination(KafkaDestination destination) {
        this.destination = destination;
    }

    @Override
    public KafkaMessageReceiver<K, V> getReceiver() {
        Properties properties = (Properties)this.props.clone();
        properties.setProperty("group.id", "group_new_consumer");
        properties.setProperty("client.id", "client_new_consumer");
        return new KafkaMessageNewReceiver(properties);
    }

    @Override
    public void returnReceiver(KafkaMessageReceiver<K, V> receiver) {
        if (receiver != null) {
            receiver.shutDown();
        }
    }

    @Override
    public synchronized void init() {
        String topic = this.destination.getDestinationName();
        KafkaMessageReceiver<K, V> receiver = this.getReceiver();
        int partSize = receiver.getPartitionCount(topic);
        if (this.poolSize == 0 || this.poolSize > partSize) {
            this.setPoolSize(partSize);
        }
        this.returnReceiver(receiver);
        switch (this.model) {
            case MODEL_1: {
                this.receivPool = Executors.newFixedThreadPool(this.poolSize, new KafkaPoolThreadFactory("ReceiverThread-" + topic));
                break;
            }
            case MODEL_2: {
                int handSize = this.poolSize * this.handleMultiple + 1;
                this.blockingQueue = new LinkedBlockingQueue<ConsumerRecords<K, V>>(this.queueSize);
                this.receivPool = Executors.newFixedThreadPool(this.poolSize, new KafkaPoolThreadFactory("ReceiverThread-" + topic));
                this.handlePool = Executors.newFixedThreadPool(handSize, new KafkaPoolThreadFactory("HandlerThread-" + topic));
                for (int i = 0; i < handSize; ++i) {
                    HandlerThread handlerThread = new HandlerThread(this.messageAdapter);
                    this.handleThreads.add(handlerThread);
                    this.handlePool.submit(handlerThread);
                }
                logger.info("Message Handler Pool initialized. PoolSize : " + handSize);
            }
        }
        if (this.retryCount > 0 && this.batch.equals((Object)MQ_BATCH.NON_BATCH)) {
            this.receiverRetry = new KafkaMessageReceiverRetry(topic, this.retryCount, this.messageAdapter);
        }
        if (this.monitorPercentage > 0 && this.model.equals((Object)MQ_MODEL.MODEL_2)) {
            this.receiverMonitor = new KafkaMessageReceiverMonitor<ConsumerRecords<K, V>>(topic, this.monitorIntervalTime, this.monitorPercentage, this.blockingQueue);
        }
        for (int i = 0; i < this.poolSize; ++i) {
            Properties properties = (Properties)this.props.clone();
            properties.setProperty("client.id", this.getClientId() + "-" + topic + "-" + i);
            ReceiverThread receiverThread = new ReceiverThread(properties, topic, this.messageAdapter);
            this.receivThreads.add(receiverThread);
            this.receivPool.submit(receiverThread);
        }
        logger.info("Message Receiver Pool initialized. PoolSize : " + this.poolSize);
        this.running.set(true);
    }

    @Override
    public synchronized void destroy() {
        for (ReceiverThread receiverThread : this.receivThreads) {
            receiverThread.shutdown();
        }
        this.receivThreads.clear();
        if (this.receivPool != null) {
            this.receivPool.shutdown();
            while (!this.receivPool.isTerminated()) {
            }
            logger.info("Message Receiver pool closed.");
        }
        if (this.blockingQueue != null) {
            while (!this.blockingQueue.isEmpty()) {
            }
        }
        for (HandlerThread handlerThread : this.handleThreads) {
            handlerThread.shutdown();
        }
        this.handleThreads.clear();
        if (this.handlePool != null) {
            this.handlePool.shutdown();
            while (!this.handlePool.isTerminated()) {
            }
            logger.info("Message Handler pool closed.");
        }
        if (this.receiverRetry != null) {
            this.receiverRetry.destroy();
        }
        if (this.receiverMonitor != null) {
            this.receiverMonitor.destroy();
        }
        this.running.set(false);
    }

    @Override
    public synchronized boolean isRunning() {
        return this.running.get();
    }

    private void commit(KafkaConsumer<K, V> consumer, ConsumerRecord<K, V> record, COMMIT commit) {
        switch (commit) {
            case SYNC_COMMIT: {
                consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L)));
                break;
            }
            case ASYNC_COMMIT: {
                consumer.commitAsync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1L)), this.offsetCommitCallback);
                break;
            }
        }
    }

    private void batchCommit(KafkaConsumer<K, V> consumer, COMMIT commit) {
        switch (commit) {
            case SYNC_COMMIT: {
                consumer.commitSync();
                break;
            }
            case ASYNC_COMMIT: {
                consumer.commitAsync();
                break;
            }
        }
    }

    private void messageReceiveRetry(ConsumerRecord<K, V> consumerRecord) {
        if (this.receiverRetry != null) {
            this.receiverRetry.receiveMessageRetry(consumerRecord);
        }
    }

    private void waitAmoment(long ms) {
        try {
            TimeUnit.MILLISECONDS.sleep(ms);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    class HandlerThread
    implements Runnable {
        public static final String tagger = "HandlerThread";
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaMessageAdapter<?, ?> adapter;

        public HandlerThread(KafkaMessageAdapter<?, ?> adapter) {
            this.adapter = adapter;
        }

        @Override
        public void run() {
            logger.info(Thread.currentThread().getName() + " start.");
            while (!this.closed.get()) {
                ConsumerRecords records = null;
                try {
                    records = KafkaMessageNewReceiverPool.this.blockingQueue.poll(KafkaMessageNewReceiverPool.this.pollTimeout, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    logger.error("BlockingQueue take failed.", (Throwable)e);
                }
                if (records != null) {
                    switch (KafkaMessageNewReceiverPool.this.batch) {
                        case BATCH: {
                            try {
                                this.adapter.messageAdapter(records);
                            }
                            catch (MQException e) {
                                logger.error("Receive message failed. failSize: " + records.count(), (Throwable)e);
                            }
                            break;
                        }
                        case NON_BATCH: {
                            for (ConsumerRecord record : records) {
                                try {
                                    this.adapter.messageAdapter(record);
                                }
                                catch (MQException e) {
                                    KafkaMessageNewReceiverPool.this.messageReceiveRetry(record);
                                    logger.error("Receive message failed. topic: " + record.topic() + " offset: " + record.offset() + " partition: " + record.partition(), (Throwable)e);
                                }
                            }
                            break;
                        }
                    }
                }
                KafkaMessageNewReceiverPool.this.waitAmoment(KafkaMessageNewReceiverPool.this.threadSleep);
            }
            logger.info(Thread.currentThread().getName() + " end.");
        }

        public void shutdown() {
            this.closed.set(true);
        }
    }

    class ReceiverThread
    implements Runnable {
        public static final String tagger = "ReceiverThread";
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer<K, V> consumer;
        private final KafkaMessageAdapter<?, ?> adapter;
        private final String topic;

        public ReceiverThread(Properties props, String topic, KafkaMessageAdapter<?, ?> adapter) {
            this.topic = topic;
            this.adapter = adapter;
            this.consumer = new KafkaConsumer(props);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            logger.info(Thread.currentThread().getName() + " start.");
            try {
                this.consumer.subscribe(Arrays.asList(this.topic));
                while (!this.closed.get()) {
                    ConsumerRecords records = this.consumer.poll(KafkaMessageNewReceiverPool.this.pollTimeout);
                    switch (KafkaMessageNewReceiverPool.this.model) {
                        case MODEL_1: {
                            switch (KafkaMessageNewReceiverPool.this.batch) {
                                case BATCH: {
                                    try {
                                        if (records.isEmpty()) break;
                                        this.adapter.messageAdapter(records);
                                        break;
                                    }
                                    catch (MQException e) {
                                        logger.error("Receive message failed. failSize:" + records.count(), (Throwable)e);
                                        break;
                                    }
                                    finally {
                                        KafkaMessageNewReceiverPool.this.batchCommit(this.consumer, KafkaMessageNewReceiverPool.this.commit);
                                    }
                                }
                                case NON_BATCH: {
                                    for (ConsumerRecord record : records) {
                                        try {
                                            this.adapter.messageAdapter(record);
                                        }
                                        catch (MQException e) {
                                            KafkaMessageNewReceiverPool.this.messageReceiveRetry(record);
                                            logger.error("Receive message failed. topic: " + record.topic() + " offset: " + record.offset() + " partition: " + record.partition(), (Throwable)e);
                                        }
                                        finally {
                                            KafkaMessageNewReceiverPool.this.commit(this.consumer, record, KafkaMessageNewReceiverPool.this.commit);
                                        }
                                    }
                                    break;
                                }
                            }
                            break;
                        }
                        case MODEL_2: {
                            try {
                                if (!records.isEmpty()) {
                                    KafkaMessageNewReceiverPool.this.blockingQueue.put(records);
                                }
                            }
                            catch (InterruptedException e) {
                                logger.error("BlockingQueue put failed.", (Throwable)e);
                            }
                            KafkaMessageNewReceiverPool.this.batchCommit(this.consumer, KafkaMessageNewReceiverPool.this.commit);
                        }
                    }
                    KafkaMessageNewReceiverPool.this.waitAmoment(KafkaMessageNewReceiverPool.this.threadSleep);
                }
            }
            catch (WakeupException e) {
                if (!this.closed.get()) {
                    throw e;
                }
            }
            finally {
                this.consumer.close();
            }
            logger.info(Thread.currentThread().getName() + " end.");
        }

        public void shutdown() {
            this.closed.set(true);
            this.consumer.wakeup();
        }
    }

    public static enum COMMIT {
        AUTO_COMMIT,
        SYNC_COMMIT,
        ASYNC_COMMIT;

    }
}

