/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.kafka.core;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import kafka.serializer.Decoder;
import kafka.utils.VerifiableProperties;
import org.darkphoenixs.kafka.core.KafkaMessageReceiver;
import org.darkphoenixs.kafka.core.ZookeeperBrokers;
import org.darkphoenixs.kafka.core.ZookeeperHosts;
import org.darkphoenixs.kafka.pool.KafkaMessageReceiverPool;
import org.darkphoenixs.mq.exception.MQException;
import org.darkphoenixs.mq.util.RefleTool;

public class KafkaMessageReceiverImpl<K, V>
implements KafkaMessageReceiver<K, V> {
    private final AtomicReference<SimpleConsumer> consumer = new AtomicReference();
    protected Map<String, Integer> replicaBrokers;
    protected PartitionMetadata metadata;
    protected FetchResponse fetchResponse;
    private KafkaMessageReceiverPool<K, V> pool;
    private VerifiableProperties props;

    public KafkaMessageReceiverImpl(Properties props, KafkaMessageReceiverPool<K, V> pool) {
        this.pool = pool;
        this.props = new VerifiableProperties(props);
        this.replicaBrokers = new LinkedHashMap<String, Integer>();
    }

    @Override
    public synchronized List<V> receive(String topic, int partition, long beginOffset, long readOffset) {
        MessageAndOffset messageAndOffset;
        long currentOffset;
        if (readOffset <= 0L) {
            throw new IllegalArgumentException("read offset must be greater than 0");
        }
        ArrayList<Object> messages = new ArrayList<Object>();
        boolean returnFlag = false;
        for (int i = 0; i < 3; ++i) {
            if (!this.checkLeader(topic, partition, beginOffset)) continue;
            returnFlag = true;
            break;
        }
        if (!returnFlag) {
            return messages;
        }
        Iterator iterator = this.fetchResponse.messageSet(topic, partition).iterator();
        while (iterator.hasNext() && (currentOffset = (messageAndOffset = (MessageAndOffset)iterator.next()).offset()) <= beginOffset + readOffset - 1L) {
            ByteBuffer valload = messageAndOffset.message().payload();
            byte[] vals = new byte[valload.limit()];
            valload.get(vals);
            Decoder decoder = (Decoder)RefleTool.newInstance(this.pool.getValDecoderClass(), this.props);
            Object val = decoder.fromBytes(vals);
            messages.add(val);
        }
        return messages;
    }

    @Override
    public synchronized Map<K, V> receiveWithKey(String topic, int partition, long beginOffset, long readOffset) {
        MessageAndOffset messageAndOffset;
        long currentOffset;
        if (readOffset <= 0L) {
            throw new IllegalArgumentException("read offset must be greater than 0");
        }
        LinkedHashMap<Object, Object> messages = new LinkedHashMap<Object, Object>();
        boolean returnFlag = false;
        for (int i = 0; i < 3; ++i) {
            if (!this.checkLeader(topic, partition, beginOffset)) continue;
            returnFlag = true;
            break;
        }
        if (!returnFlag) {
            return messages;
        }
        Iterator iterator = this.fetchResponse.messageSet(topic, partition).iterator();
        while (iterator.hasNext() && (currentOffset = (messageAndOffset = (MessageAndOffset)iterator.next()).offset()) <= beginOffset + readOffset - 1L) {
            ByteBuffer keyload = messageAndOffset.message().key();
            ByteBuffer valload = messageAndOffset.message().payload();
            byte[] keys = new byte[keyload.limit()];
            byte[] vals = new byte[valload.limit()];
            keyload.get(keys);
            valload.get(vals);
            Decoder keyDecoder = (Decoder)RefleTool.newInstance(this.pool.getKeyDecoderClass(), this.props);
            Decoder valDecoder = (Decoder)RefleTool.newInstance(this.pool.getValDecoderClass(), this.props);
            Object key = keyDecoder.fromBytes(keys);
            Object val = valDecoder.fromBytes(vals);
            messages.put(key, val);
        }
        return messages;
    }

    @Override
    public synchronized long getLatestOffset(String topic, int partition) {
        if (this.checkConsumer(topic, partition)) {
            TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
            HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
            requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
            OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), this.pool.getClientId());
            OffsetResponse response = this.consumer.get().getOffsetsBefore(request);
            if (response.hasError()) {
                logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
                return 0L;
            }
            long[] offsets = response.offsets(topic, partition);
            return offsets[0];
        }
        return -1L;
    }

    @Override
    public synchronized long getEarliestOffset(String topic, int partition) {
        if (this.checkConsumer(topic, partition)) {
            TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
            HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
            requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
            OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), this.pool.getClientId());
            OffsetResponse response = this.consumer.get().getOffsetsBefore(request);
            if (response.hasError()) {
                logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
                return 0L;
            }
            long[] offsets = response.offsets(topic, partition);
            return offsets[0];
        }
        return -1L;
    }

    @Override
    public int getPartitionCount(String topic) {
        return this.getPartitionNum(topic);
    }

    @Override
    public synchronized void shutDown() {
        if (this.consumer.get() != null) {
            this.consumer.get().close();
            this.consumer.set(null);
        }
    }

    private PartitionMetadata findNewLeader(String a_oldLeader, String a_topic, int a_partition) throws MQException {
        for (int i = 0; i < 3; ++i) {
            boolean goToSleep = false;
            PartitionMetadata metadata = this.findLeader(this.replicaBrokers, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                goToSleep = true;
            } else {
                return metadata;
            }
            if (!goToSleep) continue;
            try {
                TimeUnit.MILLISECONDS.sleep(1000L);
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        logger.error("Unable to find new leader after Broker failure. Exiting");
        throw new MQException("Unable to find new leader after Broker failure. Exiting");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private PartitionMetadata findLeader(Map<String, Integer> a_seedBrokers, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        for (Map.Entry<String, Integer> entry : a_seedBrokers.entrySet()) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(entry.getKey(), entry.getValue().intValue(), 100000, 65536, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                TopicMetadataResponse resp = consumer.send(req);
                List metaData = resp.topicsMetadata();
                block6: for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() != a_partition) continue;
                        returnMetaData = part;
                        continue block6;
                    }
                }
            }
            catch (Exception e) {
                logger.error("Error communicating with Broker [" + entry.getKey() + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
            }
            finally {
                if (consumer == null) continue;
                consumer.close();
            }
        }
        if (returnMetaData != null) {
            this.replicaBrokers.clear();
            for (BrokerEndPoint replica : returnMetaData.replicas()) {
                this.replicaBrokers.put(replica.host(), replica.port());
            }
        }
        return returnMetaData;
    }

    private boolean checkLeader(String a_topic, int a_partition, long a_beginOffset) {
        if (this.checkConsumer(a_topic, a_partition)) {
            FetchRequest req = new FetchRequestBuilder().clientId(this.pool.getClientId()).addFetch(a_topic, a_partition, a_beginOffset, 100000).build();
            this.fetchResponse = this.consumer.get().fetch(req);
            String leadHost = this.metadata.leader().host();
            if (this.fetchResponse.hasError()) {
                short code = this.fetchResponse.errorCode(a_topic, a_partition);
                logger.error("Error fetching data from the Broker:" + leadHost + " Reason: " + code);
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    a_beginOffset = this.getLatestOffset(a_topic, a_partition);
                }
                this.consumer.get().close();
                this.consumer.set(null);
                try {
                    this.metadata = this.findNewLeader(leadHost, a_topic, a_partition);
                }
                catch (MQException e) {
                    logger.error("Find new leader failed.", (Throwable)e);
                }
                return false;
            }
            return true;
        }
        return false;
    }

    private boolean checkConsumer(String a_topic, int a_partition) {
        if (this.consumer.get() == null) {
            if (this.metadata == null) {
                String[] brokers;
                this.replicaBrokers.clear();
                String brokerStr = this.getBrokerStr(a_topic);
                for (String broker : brokers = brokerStr.split(",")) {
                    String[] hostport = broker.split(":");
                    this.replicaBrokers.put(hostport[0], Integer.valueOf(hostport[1]));
                }
                this.metadata = this.findLeader(this.replicaBrokers, a_topic, a_partition);
            }
            if (this.metadata == null) {
                logger.error("Can't find metadata for Topic and Partition. Exiting");
                return false;
            }
            if (this.metadata.leader() == null) {
                logger.error("Can't find Leader for Topic and Partition. Exiting");
                return false;
            }
            String leadHost = this.metadata.leader().host();
            Integer leadPort = this.metadata.leader().port();
            String clientName = this.pool.getClientId();
            this.consumer.compareAndSet(null, new SimpleConsumer(leadHost, leadPort.intValue(), 100000, 65536, clientName));
        }
        return true;
    }

    private String getBrokerStr(String topic) {
        ZookeeperHosts zkHosts = new ZookeeperHosts(this.pool.getZookeeperStr(), topic);
        ZookeeperBrokers brokers = new ZookeeperBrokers(zkHosts);
        String brokerStr = brokers.getBrokerInfo();
        brokers.close();
        return brokerStr;
    }

    private int getPartitionNum(String topic) {
        ZookeeperHosts zkHosts = new ZookeeperHosts(this.pool.getZookeeperStr(), topic);
        ZookeeperBrokers brokers = new ZookeeperBrokers(zkHosts);
        int partitionNum = brokers.getNumPartitions();
        brokers.close();
        return partitionNum;
    }
}

