/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.kafka.pool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.message.MessageAndMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.darkphoenixs.kafka.core.KafkaMessageAdapter;
import org.darkphoenixs.kafka.pool.KafkaPoolThreadFactory;
import org.darkphoenixs.mq.exception.MQException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageReceiverRetry<T> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageReceiverRetry.class);
    private final int errorTimeout = 3000;
    private final int errorQueueSize = 10000;
    private final int errorPoolSize = 1;
    private final int retryCount;
    protected final BlockingQueue<T> errorMessageQueue;
    protected final ExecutorService errorMessagePool;
    protected final List<RetryThread> errorRetryThreads;
    protected final ConcurrentMap<String, AtomicInteger> errorMessageCount;

    public KafkaMessageReceiverRetry(String topic, int retryCount, KafkaMessageAdapter<?, ?> messageAdapter) {
        this.retryCount = retryCount;
        this.errorMessageQueue = new LinkedBlockingQueue<T>(10000);
        this.errorRetryThreads = new ArrayList<RetryThread>(1);
        this.errorMessagePool = Executors.newFixedThreadPool(1, new KafkaPoolThreadFactory("RetryThread-" + topic));
        this.errorMessageCount = new ConcurrentHashMap<String, AtomicInteger>();
        RetryThread retryThread = new RetryThread(messageAdapter);
        this.errorRetryThreads.add(retryThread);
        this.errorMessagePool.submit(retryThread);
    }

    public void receiveMessageRetry(T record) {
        try {
            ConsumerRecord consumerRecord;
            if (record instanceof MessageAndMetadata) {
                MessageAndMetadata messageAndMetadata = (MessageAndMetadata)record;
                if (0 < this.errorMessageCount(messageAndMetadata.topic(), messageAndMetadata.partition(), messageAndMetadata.offset())) {
                    this.errorMessageQueue.offer(record, 3000L, TimeUnit.MILLISECONDS);
                }
            } else if (record instanceof ConsumerRecord && 0 < this.errorMessageCount((consumerRecord = (ConsumerRecord)record).topic(), consumerRecord.partition(), consumerRecord.offset())) {
                this.errorMessageQueue.offer(record, 3000L, TimeUnit.MILLISECONDS);
            }
        }
        catch (InterruptedException e) {
            logger.error("BlockingQueue offer failed.", (Throwable)e);
        }
    }

    public void receiveMessageClean(T record) {
        if (record instanceof MessageAndMetadata) {
            MessageAndMetadata messageAndMetadata = (MessageAndMetadata)record;
            this.errorMessageCount.remove(this.errorMessageKey(messageAndMetadata.topic(), messageAndMetadata.partition(), messageAndMetadata.offset()));
        } else if (record instanceof ConsumerRecord) {
            ConsumerRecord consumerRecord = (ConsumerRecord)record;
            this.errorMessageCount.remove(this.errorMessageKey(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()));
        }
    }

    public int receiveMessageCount(T record) {
        if (record instanceof MessageAndMetadata) {
            MessageAndMetadata messageAndMetadata = (MessageAndMetadata)record;
            return ((AtomicInteger)this.errorMessageCount.get(this.errorMessageKey(messageAndMetadata.topic(), messageAndMetadata.partition(), messageAndMetadata.offset()))).get();
        }
        if (record instanceof ConsumerRecord) {
            ConsumerRecord consumerRecord = (ConsumerRecord)record;
            return ((AtomicInteger)this.errorMessageCount.get(this.errorMessageKey(consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset()))).get();
        }
        return 0;
    }

    public void destroy() {
        if (this.errorMessageQueue != null) {
            while (!this.errorMessageQueue.isEmpty()) {
            }
        }
        for (RetryThread thread : this.errorRetryThreads) {
            thread.shutdown();
        }
        this.errorRetryThreads.clear();
        if (this.errorMessagePool != null) {
            this.errorMessagePool.shutdown();
            while (!this.errorMessagePool.isTerminated()) {
            }
        }
        logger.info("Message Retry pool closed.");
    }

    private String errorMessageKey(String _topic, int _partition, long _offset) {
        return _topic + "_" + _partition + "_" + _offset;
    }

    protected int errorMessageCount(String topic, int partition, long offset) {
        if (this.retryCount == 0) {
            return 0;
        }
        String errorKey = this.errorMessageKey(topic, partition, offset);
        AtomicInteger count = (AtomicInteger)this.errorMessageCount.get(errorKey);
        if (null == count) {
            count = this.errorMessageCount.putIfAbsent(errorKey, new AtomicInteger(1));
        }
        if (null != count) {
            if (this.retryCount > count.get()) {
                return count.incrementAndGet();
            }
            this.errorMessageCount.remove(errorKey);
            return 0;
        }
        return 1;
    }

    class RetryThread
    implements Runnable {
        public static final String tagger = "RetryThread";
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaMessageAdapter<?, ?> adapter;

        public RetryThread(KafkaMessageAdapter<?, ?> adapter) {
            this.adapter = adapter;
        }

        @Override
        public void run() {
            logger.info(Thread.currentThread().getName() + " start.");
            while (!this.closed.get()) {
                Object record = null;
                try {
                    record = KafkaMessageReceiverRetry.this.errorMessageQueue.poll(3000L, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException e) {
                    logger.error("BlockingQueue poll failed.", (Throwable)e);
                }
                if (record == null) continue;
                int retries = KafkaMessageReceiverRetry.this.receiveMessageCount(record);
                try {
                    logger.warn("Retry receive message. Number of retries: " + retries);
                    this.receiveMessageAdapter(record);
                    KafkaMessageReceiverRetry.this.receiveMessageClean(record);
                }
                catch (MQException e) {
                    KafkaMessageReceiverRetry.this.receiveMessageRetry(record);
                    this.receiveMessageError(record, retries, e);
                }
            }
            logger.info(Thread.currentThread().getName() + " end.");
        }

        public void receiveMessageAdapter(T record) throws MQException {
            if (record instanceof ConsumerRecord) {
                ConsumerRecord consumerRecord = (ConsumerRecord)record;
                this.adapter.messageAdapter(consumerRecord);
            } else if (record instanceof MessageAndMetadata) {
                MessageAndMetadata messageAndMetadata = (MessageAndMetadata)record;
                this.adapter.messageAdapter(messageAndMetadata);
            }
        }

        public void receiveMessageError(T record, int retries, MQException e) {
            if (record instanceof ConsumerRecord) {
                ConsumerRecord consumerRecord = (ConsumerRecord)record;
                logger.error("Receive message failed. retries: " + retries + " topic: " + consumerRecord.topic() + " offset: " + consumerRecord.offset() + " partition: " + consumerRecord.partition(), (Throwable)e);
            } else if (record instanceof MessageAndMetadata) {
                MessageAndMetadata messageAndMetadata = (MessageAndMetadata)record;
                logger.error("Receive message failed. retries: " + retries + " topic: " + messageAndMetadata.topic() + " offset: " + messageAndMetadata.offset() + " partition: " + messageAndMetadata.partition(), (Throwable)e);
            }
        }

        public void shutdown() {
            this.closed.set(true);
        }
    }
}

