/*
 * Decompiled with CFR 0.152.
 */
package org.darkphoenixs.kafka.pool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.darkphoenixs.kafka.core.KafkaMessageSender;
import org.darkphoenixs.kafka.core.KafkaMessageSenderImpl;
import org.darkphoenixs.kafka.core.ZookeeperBrokers;
import org.darkphoenixs.kafka.core.ZookeeperHosts;
import org.darkphoenixs.kafka.pool.KafkaPoolThreadFactory;
import org.darkphoenixs.kafka.pool.MessageSenderPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PropertiesLoaderUtils;

public class KafkaMessageSenderPool<K, V>
implements MessageSenderPool<K, V> {
    private static final String tagger = "KafkaMessageSenderPool";
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageSenderPool.class);
    private static final int defaultSize = Runtime.getRuntime().availableProcessors() * 2 + 1;
    protected Semaphore freeSender;
    protected LinkedBlockingQueue<KafkaMessageSender<K, V>> queue;
    protected ExecutorService pool;
    protected ReadWriteLock closingLock = new ReentrantReadWriteLock();
    protected Properties props = new Properties();
    protected AtomicBoolean running = new AtomicBoolean(false);
    private int poolSize;
    private Resource config;
    private ThreadFactory threadFactory;

    public ThreadFactory getThreadFactory() {
        return this.threadFactory;
    }

    public void setThreadFactory(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    public void setZkhosts(ZookeeperHosts zkhosts) {
        ZookeeperBrokers brokers = new ZookeeperBrokers(zkhosts.getBrokerZkStr(), zkhosts.getBrokerZkPath(), zkhosts.getTopic());
        this.setBrokerStr(brokers.getBrokerInfo());
        brokers.close();
    }

    public String getClientId() {
        return this.props.getProperty("client.id");
    }

    public void setClientId(String clientId) {
        this.props.setProperty("client.id", clientId);
    }

    public String getBrokerStr() {
        return this.props.getProperty("metadata.broker.list");
    }

    public void setBrokerStr(String brokerStr) {
        this.props.setProperty("metadata.broker.list", brokerStr);
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public Resource getConfig() {
        return this.config;
    }

    public void setConfig(Resource config) {
        this.config = config;
        try {
            PropertiesLoaderUtils.fillProperties((Properties)this.props, (Resource)this.config);
        }
        catch (IOException e) {
            logger.error("Fill properties failed.", (Throwable)e);
        }
    }

    public Properties getProps() {
        return this.props;
    }

    public void setProps(Properties props) {
        this.props = props;
    }

    @Override
    public synchronized void init() {
        if (this.poolSize == 0 || this.poolSize < defaultSize) {
            this.setPoolSize(defaultSize);
        }
        this.freeSender = new Semaphore(this.poolSize);
        this.queue = new LinkedBlockingQueue(this.poolSize);
        this.threadFactory = new KafkaPoolThreadFactory("KafkaMessageSenderPool-" + this.getBrokerStr(), true);
        this.pool = Executors.newFixedThreadPool(this.poolSize, this.threadFactory);
        logger.info("Message sender pool initializing. poolSize : " + this.poolSize + " config : " + this.props.toString());
        ArrayList<InitTask> taskList = new ArrayList<InitTask>();
        CountDownLatch count = new CountDownLatch(this.poolSize);
        for (int i = 0; i < this.poolSize; ++i) {
            taskList.add(new InitTask(count));
        }
        try {
            this.pool.invokeAll(taskList);
            count.await(2L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            logger.error("Failed to init the MessageSenderPool", (Throwable)e);
        }
        logger.info("Message sender pool initialized.");
        this.running.set(true);
    }

    @Override
    public KafkaMessageSender<K, V> getSender() {
        try {
            if (this.freeSender != null && !this.freeSender.tryAcquire(2000L, TimeUnit.MILLISECONDS)) {
                throw new RuntimeException("Timeout waiting for idle object in the pool.");
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Interrupted waiting for idle object in the pool .");
        }
        KafkaMessageSender<K, V> sender = null;
        this.closingLock.readLock().lock();
        try {
            sender = this.queue.poll();
            if (sender == null) {
                sender = new KafkaMessageSenderImpl(this.props);
                logger.info("Add new sender to the pool.");
                this.queue.offer(sender);
            }
        }
        catch (Exception e) {
            logger.error("Failed to get the MessageSender", (Throwable)e);
        }
        finally {
            this.closingLock.readLock().unlock();
        }
        return sender;
    }

    @Override
    public void returnSender(KafkaMessageSender<K, V> sender) {
        if (this.queue.contains(sender)) {
            return;
        }
        this.queue.offer(sender);
        this.freeSender.release();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void destroy() {
        logger.info("Message sender pool closing.");
        this.closingLock.writeLock().lock();
        try {
            ArrayList<DestroyTask> taskList = new ArrayList<DestroyTask>();
            int size = this.queue.size();
            CountDownLatch count = new CountDownLatch(size);
            for (int i = 0; i < size; ++i) {
                taskList.add(new DestroyTask(count));
            }
            this.pool.invokeAll(taskList);
            count.await(2L, TimeUnit.MINUTES);
            this.pool.shutdownNow();
        }
        catch (Exception e) {
            logger.error("Failed to close the MessageSenderPool", (Throwable)e);
        }
        finally {
            this.closingLock.writeLock().unlock();
        }
        logger.info("Message sender pool closed.");
        this.running.set(false);
    }

    @Override
    public synchronized boolean isRunning() {
        return this.running.get();
    }

    class DestroyTask
    implements Callable<Boolean> {
        CountDownLatch count;

        public DestroyTask(CountDownLatch count) {
            this.count = count;
        }

        @Override
        public Boolean call() throws Exception {
            KafkaMessageSender sender = KafkaMessageSenderPool.this.queue.poll();
            sender.shutDown();
            this.count.countDown();
            return true;
        }
    }

    class InitTask
    implements Callable<Boolean> {
        CountDownLatch count;

        public InitTask(CountDownLatch count) {
            this.count = count;
        }

        @Override
        public Boolean call() throws Exception {
            KafkaMessageSenderImpl sender = new KafkaMessageSenderImpl(KafkaMessageSenderPool.this.props);
            KafkaMessageSenderPool.this.queue.offer(sender);
            this.count.countDown();
            return true;
        }
    }
}

