/*
 * Decompiled with CFR 0.152.
 */
package com.github.brainlag.nsq;

import com.github.brainlag.nsq.Connection;
import com.github.brainlag.nsq.NSQCommand;
import com.github.brainlag.nsq.NSQConfig;
import com.github.brainlag.nsq.ServerAddress;
import com.github.brainlag.nsq.exceptions.BadMessageException;
import com.github.brainlag.nsq.exceptions.BadTopicException;
import com.github.brainlag.nsq.exceptions.NSQException;
import com.github.brainlag.nsq.exceptions.NoConnectionsException;
import com.github.brainlag.nsq.frames.ErrorFrame;
import com.github.brainlag.nsq.frames.NSQFrame;
import com.github.brainlag.nsq.pool.ConnectionPoolFactory;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;

public class NSQProducer {
    private Set<ServerAddress> addresses = Sets.newConcurrentHashSet();
    private int roundRobinCount = 0;
    private volatile boolean started = false;
    private ExecutorService executor = Executors.newCachedThreadPool();
    private GenericKeyedObjectPoolConfig poolConfig = null;
    private GenericKeyedObjectPool<ServerAddress, Connection> pool;
    private NSQConfig config = new NSQConfig();
    private int connectionRetries = 5;

    public NSQProducer start() {
        if (!this.started) {
            this.started = true;
            this.createPool();
        }
        return this;
    }

    private void createPool() {
        if (this.poolConfig == null) {
            this.poolConfig = new GenericKeyedObjectPoolConfig();
            this.poolConfig.setTestOnBorrow(true);
            this.poolConfig.setJmxEnabled(false);
        }
        this.pool = new GenericKeyedObjectPool<ServerAddress, Connection>(new ConnectionPoolFactory(this.config), this.poolConfig);
    }

    protected Connection getConnection() throws NoConnectionsException {
        int c = 0;
        while (c < this.connectionRetries) {
            ServerAddress[] serverAddresses = this.addresses.toArray(new ServerAddress[this.addresses.size()]);
            if (serverAddresses.length == 0) continue;
            try {
                return this.pool.borrowObject(serverAddresses[this.roundRobinCount++ % serverAddresses.length]);
            }
            catch (NoSuchElementException e) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException ix) {
                    throw new NoConnectionsException("Could not acquire a connection to a server", ix);
                }
            }
            catch (Exception ex) {
                throw new NoConnectionsException("Could not acquire a connection to a server", ex);
            }
        }
        throw new IllegalStateException("No server configured for producer");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void produceMulti(String topic, List<byte[]> messages) throws TimeoutException, NSQException {
        if (!this.started) {
            throw new IllegalStateException("Producer must be started before producing messages!");
        }
        if (messages == null || messages.isEmpty()) {
            return;
        }
        if (messages.size() == 1) {
            this.produce(topic, messages.get(0));
            return;
        }
        Connection c = this.getConnection();
        try {
            NSQCommand command = NSQCommand.multiPublish(topic, messages);
            NSQFrame frame = c.commandAndWait(command);
            if (frame != null && frame instanceof ErrorFrame) {
                String err = ((ErrorFrame)frame).getErrorMessage();
                if (err.startsWith("E_BAD_TOPIC")) {
                    throw new BadTopicException(err);
                }
                if (err.startsWith("E_BAD_MESSAGE")) {
                    throw new BadMessageException(err);
                }
            }
        }
        finally {
            this.pool.returnObject(c.getServerAddress(), c);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void produce(String topic, byte[] message) throws NSQException, TimeoutException {
        if (!this.started) {
            throw new IllegalStateException("Producer must be started before producing messages!");
        }
        Connection c = this.getConnection();
        try {
            NSQCommand command = NSQCommand.publish(topic, message);
            NSQFrame frame = c.commandAndWait(command);
            if (frame != null && frame instanceof ErrorFrame) {
                String err = ((ErrorFrame)frame).getErrorMessage();
                if (err.startsWith("E_BAD_TOPIC")) {
                    throw new BadTopicException(err);
                }
                if (err.startsWith("E_BAD_MESSAGE")) {
                    throw new BadMessageException(err);
                }
            }
        }
        finally {
            this.pool.returnObject(c.getServerAddress(), c);
        }
    }

    public NSQProducer addAddress(String host, int port) {
        this.addresses.add(new ServerAddress(host, port));
        return this;
    }

    public NSQProducer removeAddress(String host, int port) {
        this.addresses.remove(new ServerAddress(host, port));
        return this;
    }

    public NSQProducer setPoolConfig(GenericKeyedObjectPoolConfig poolConfig) {
        if (!this.started) {
            this.poolConfig = poolConfig;
        }
        return this;
    }

    public NSQProducer setExecutor(ExecutorService executor) {
        if (!this.started) {
            this.executor = executor;
        }
        return this;
    }

    public NSQProducer setConfig(NSQConfig config) {
        if (!this.started) {
            this.config = config;
        }
        return this;
    }

    protected ExecutorService getExecutor() {
        return this.executor;
    }

    public GenericKeyedObjectPool<ServerAddress, Connection> getPool() {
        return this.pool;
    }

    public void shutdown() {
        this.started = false;
        this.pool.close();
        this.executor.shutdown();
    }
}

