/*
 * Decompiled with CFR 0.152.
 */
package com.github.brainlag.nsq;

import com.github.brainlag.nsq.Connection;
import com.github.brainlag.nsq.NSQCommand;
import com.github.brainlag.nsq.NSQConfig;
import com.github.brainlag.nsq.NSQMessage;
import com.github.brainlag.nsq.ServerAddress;
import com.github.brainlag.nsq.callbacks.NSQErrorCallback;
import com.github.brainlag.nsq.callbacks.NSQMessageCallback;
import com.github.brainlag.nsq.exceptions.NoConnectionsException;
import com.github.brainlag.nsq.frames.ErrorFrame;
import com.github.brainlag.nsq.frames.NSQFrame;
import com.github.brainlag.nsq.lookup.NSQLookup;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;

public class NSQConsumer
implements Closeable {
    private final NSQLookup lookup;
    private final String topic;
    private final String channel;
    private final NSQMessageCallback callback;
    private final NSQErrorCallback errorCallback;
    private final NSQConfig config;
    private volatile long nextTimeout = 0L;
    private final Map<ServerAddress, Connection> connections = Maps.newHashMap();
    private final AtomicLong totalMessages = new AtomicLong(0L);
    private boolean started = false;
    private int messagesPerBatch;
    private long lookupPeriod = 60000L;
    private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private Executor executor = Executors.newCachedThreadPool();
    private Optional<ScheduledFuture<?>> timeout = Optional.empty();

    public NSQConsumer(NSQLookup lookup, String topic, String channel, NSQMessageCallback callback) {
        this(lookup, topic, channel, callback, new NSQConfig());
    }

    public NSQConsumer(NSQLookup lookup, String topic, String channel, NSQMessageCallback callback, NSQConfig config) {
        this(lookup, topic, channel, callback, config, null);
    }

    public NSQConsumer(NSQLookup lookup, String topic, String channel, NSQMessageCallback callback, NSQConfig config, NSQErrorCallback errCallback) {
        this.lookup = lookup;
        this.topic = topic;
        this.channel = channel;
        this.config = config;
        this.callback = callback;
        this.errorCallback = errCallback;
        this.messagesPerBatch = config.getMaxInFlight().orElse(200);
    }

    public NSQConsumer start() {
        if (!this.started) {
            this.started = true;
            this.connect();
            this.scheduler.scheduleAtFixedRate(() -> this.connect(), this.lookupPeriod, this.lookupPeriod, TimeUnit.MILLISECONDS);
        }
        return this;
    }

    private Connection createConnection(ServerAddress serverAddress) {
        try {
            Connection connection = new Connection(serverAddress, this.config);
            connection.setConsumer(this);
            connection.setErrorCallback(this.errorCallback);
            connection.command(NSQCommand.subscribe(this.topic, this.channel));
            connection.command(NSQCommand.ready(this.messagesPerBatch));
            return connection;
        }
        catch (NoConnectionsException e) {
            LogManager.getLogger(this).warn("Could not create connection to server {}", new Object[]{serverAddress.toString(), e});
            return null;
        }
    }

    protected void processMessage(NSQMessage message) {
        if (this.callback == null) {
            LogManager.getLogger(this).warn("NO Callback, dropping message: " + message);
        } else {
            try {
                this.executor.execute(() -> this.callback.message(message));
                if (this.nextTimeout > 0L) {
                    this.updateTimeout(message, -500L);
                }
            }
            catch (RejectedExecutionException re) {
                LogManager.getLogger(this).trace("Backing off");
                message.requeue();
                this.updateTimeout(message, 500L);
            }
        }
        long tot = this.totalMessages.incrementAndGet();
        if (tot % (long)this.messagesPerBatch > (long)(this.messagesPerBatch / 2)) {
            this.rdy(message, this.messagesPerBatch);
        }
    }

    private void updateTimeout(NSQMessage message, long change) {
        Date newTimeout;
        this.rdy(message, 0);
        LogManager.getLogger(this).trace("RDY 0! Halt Flow.");
        if (this.timeout.isPresent()) {
            this.timeout.get().cancel(true);
        }
        if ((newTimeout = this.calculateTimeoutDate(change)) != null) {
            this.timeout = Optional.of(this.scheduler.schedule(() -> this.rdy(message, 1), 0L, TimeUnit.MILLISECONDS));
        }
    }

    private void rdy(NSQMessage message, int size) {
        message.getConnection().command(NSQCommand.ready(size));
    }

    private Date calculateTimeoutDate(long i) {
        if (System.currentTimeMillis() - this.nextTimeout + i > 50L) {
            this.nextTimeout += i;
            return new Date();
        }
        this.nextTimeout = 0L;
        return null;
    }

    public void shutdown() {
        this.scheduler.shutdown();
        this.cleanClose();
    }

    private void cleanClose() {
        NSQCommand command = NSQCommand.startClose();
        try {
            for (Connection connection : this.connections.values()) {
                String err;
                NSQFrame frame = connection.commandAndWait(command);
                if (frame == null || !(frame instanceof ErrorFrame) || !(err = ((ErrorFrame)frame).getErrorMessage()).startsWith("E_INVALID")) continue;
                throw new IllegalStateException(err);
            }
        }
        catch (TimeoutException e) {
            LogManager.getLogger(this).warn("No clean disconnect", (Throwable)e);
        }
    }

    public NSQConsumer setMessagesPerBatch(int messagesPerBatch) {
        if (!this.started) {
            this.messagesPerBatch = messagesPerBatch;
        }
        return this;
    }

    public NSQConsumer setLookupPeriod(long periodMillis) {
        if (!this.started) {
            this.lookupPeriod = periodMillis;
        }
        return this;
    }

    private void connect() {
        Iterator<Map.Entry<ServerAddress, Connection>> it = this.connections.entrySet().iterator();
        while (it.hasNext()) {
            Connection cnn = it.next().getValue();
            if (cnn.isConnected() && cnn.isHeartbeatStatusOK()) continue;
            cnn.close();
            it.remove();
        }
        Set<ServerAddress> newAddresses = this.lookupAddresses();
        Set<ServerAddress> oldAddresses = this.connections.keySet();
        LogManager.getLogger(this).debug("Addresses NSQ connected to: " + newAddresses);
        if (newAddresses.isEmpty()) {
            LogManager.getLogger(this).warn("No NSQLookup server connections or topic does not exist.");
        } else {
            for (ServerAddress server : Sets.difference(oldAddresses, newAddresses)) {
                LogManager.getLogger(this).info("Remove connection " + server.toString());
                this.connections.get(server).close();
                this.connections.remove(server);
            }
            for (ServerAddress server : Sets.difference(newAddresses, oldAddresses)) {
                Connection connection;
                if (this.connections.containsKey(server) || (connection = this.createConnection(server)) == null) continue;
                this.connections.put(server, connection);
            }
        }
    }

    public long getTotalMessages() {
        return this.totalMessages.get();
    }

    public NSQConsumer setExecutor(Executor executor) {
        if (!this.started) {
            this.executor = executor;
        }
        return this;
    }

    private Set<ServerAddress> lookupAddresses() {
        return this.lookup.lookup(this.topic);
    }

    public ScheduledFuture scheduleRun(Runnable task, int delay, int period, TimeUnit unit) {
        return this.scheduler.scheduleAtFixedRate(task, delay, period, unit);
    }

    public NSQConsumer setScheduledExecutor(ScheduledExecutorService scheduler) {
        if (!this.started) {
            this.scheduler = scheduler;
        }
        return this;
    }

    @Override
    public void close() throws IOException {
        this.shutdown();
    }
}

