package com.github.brainlag.nsq;

import com.github.brainlag.nsq.callbacks.NSQErrorCallback;
import com.github.brainlag.nsq.exceptions.NSQException;
import com.github.brainlag.nsq.exceptions.NoConnectionsException;
import com.github.brainlag.nsq.frames.ErrorFrame;
import com.github.brainlag.nsq.frames.MessageFrame;
import com.github.brainlag.nsq.frames.NSQFrame;
import com.github.brainlag.nsq.frames.ResponseFrame;
import com.github.brainlag.nsq.netty.NSQClientInitializer;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;

/* loaded from: input_file:com/github/brainlag/nsq/Connection.class */
public class Connection {
    private final ServerAddress address;
    private final Channel channel;
    private final EventLoopGroup eventLoopGroup;
    private final NSQConfig config;
    public static final long HEARTBEAT_MAX_INTERVAL = 60000;
    public static final byte[] MAGIC_PROTOCOL_VERSION = "  V2".getBytes();
    public static final AttributeKey<Connection> STATE = AttributeKey.valueOf("Connection.state");
    private static EventLoopGroup defaultGroup = null;
    private NSQConsumer consumer = null;
    private NSQErrorCallback errorCallback = null;
    private final LinkedBlockingQueue<NSQCommand> requests = new LinkedBlockingQueue<>(1);
    private final LinkedBlockingQueue<NSQFrame> responses = new LinkedBlockingQueue<>(1);
    private volatile AtomicReference<Long> lastHeartbeatSuccess = new AtomicReference<>(Long.valueOf(System.currentTimeMillis()));

    public Connection(ServerAddress serverAddress, NSQConfig nSQConfig) throws NoConnectionsException {
        this.address = serverAddress;
        this.config = nSQConfig;
        Bootstrap bootstrap = new Bootstrap();
        this.eventLoopGroup = nSQConfig.getEventLoopGroup() != null ? nSQConfig.getEventLoopGroup() : getDefaultGroup();
        bootstrap.group(this.eventLoopGroup);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new NSQClientInitializer());
        ChannelFuture connect = bootstrap.connect(new InetSocketAddress(serverAddress.getHost(), serverAddress.getPort()));
        this.channel = connect.awaitUninterruptibly().channel();
        if (!connect.isSuccess()) {
            throw new NoConnectionsException("Could not connect to server", connect.cause());
        }
        LogManager.getLogger(this).info("Created connection: " + serverAddress.toString());
        this.channel.attr(STATE).set(this);
        ByteBuf buffer = Unpooled.buffer();
        buffer.writeBytes(MAGIC_PROTOCOL_VERSION);
        this.channel.write(buffer);
        this.channel.flush();
        try {
            NSQFrame commandAndWait = commandAndWait(NSQCommand.identify(nSQConfig.toString().getBytes()));
            if (commandAndWait != null) {
                LogManager.getLogger(this).info("Server identification: " + ((ResponseFrame) commandAndWait).getMessage());
            }
        } catch (TimeoutException e) {
            LogManager.getLogger(this).error("Creating connection timed out", e);
            close();
        }
    }

    private EventLoopGroup getDefaultGroup() {
        if (defaultGroup == null) {
            defaultGroup = new NioEventLoopGroup();
        }
        return defaultGroup;
    }

    public boolean isConnected() {
        return this.channel.isActive();
    }

    public boolean isRequestInProgress() {
        return this.requests.size() > 0;
    }

    public boolean isHeartbeatStatusOK() {
        return System.currentTimeMillis() - this.lastHeartbeatSuccess.get().longValue() <= HEARTBEAT_MAX_INTERVAL;
    }

    public void incoming(NSQFrame nSQFrame) {
        if (nSQFrame instanceof ResponseFrame) {
            if ("_heartbeat_".equals(((ResponseFrame) nSQFrame).getMessage())) {
                heartbeat();
                return;
            }
            if (this.requests.isEmpty()) {
                return;
            }
            try {
                this.responses.offer(nSQFrame, 20L, TimeUnit.SECONDS);
                return;
            } catch (InterruptedException e) {
                LogManager.getLogger(this).error("Thread was interruped, probably shuthing down", e);
                close();
                return;
            }
        }
        if (nSQFrame instanceof ErrorFrame) {
            if (this.errorCallback != null) {
                this.errorCallback.error(NSQException.of((ErrorFrame) nSQFrame));
            }
            this.responses.add(nSQFrame);
        } else {
            if (!(nSQFrame instanceof MessageFrame)) {
                LogManager.getLogger(this).warn("Unknown frame type: " + nSQFrame);
                return;
            }
            MessageFrame messageFrame = (MessageFrame) nSQFrame;
            NSQMessage nSQMessage = new NSQMessage();
            nSQMessage.setAttempts(messageFrame.getAttempts());
            nSQMessage.setConnection(this);
            nSQMessage.setId(messageFrame.getMessageId());
            nSQMessage.setMessage(messageFrame.getMessageBody());
            nSQMessage.setTimestamp(new Date(TimeUnit.NANOSECONDS.toMillis(messageFrame.getTimestamp())));
            this.consumer.processMessage(nSQMessage);
        }
    }

    private void heartbeat() {
        LogManager.getLogger(this).trace("HEARTBEAT!");
        command(NSQCommand.nop());
        this.lastHeartbeatSuccess.getAndSet(Long.valueOf(System.currentTimeMillis()));
    }

    public void setErrorCallback(NSQErrorCallback nSQErrorCallback) {
        this.errorCallback = nSQErrorCallback;
    }

    public void close() {
        LogManager.getLogger(this).info("Closing  connection: " + this);
        this.channel.disconnect();
    }

    public NSQFrame commandAndWait(NSQCommand nSQCommand) throws TimeoutException {
        try {
            if (!this.requests.offer(nSQCommand, 15L, TimeUnit.SECONDS)) {
                throw new TimeoutException("command: " + nSQCommand + " timedout");
            }
            this.responses.clear();
            if (!command(nSQCommand).await(15L, TimeUnit.SECONDS)) {
                throw new TimeoutException("command: " + nSQCommand + " timedout");
            }
            NSQFrame poll = this.responses.poll(15L, TimeUnit.SECONDS);
            if (poll == null) {
                throw new TimeoutException("command: " + nSQCommand + " timedout");
            }
            this.requests.poll();
            return poll;
        } catch (InterruptedException e) {
            close();
            LogManager.getLogger(this).warn("Thread was interruped!", e);
            return null;
        }
    }

    public ChannelFuture command(NSQCommand nSQCommand) {
        return this.channel.writeAndFlush(nSQCommand);
    }

    public ServerAddress getServerAddress() {
        return this.address;
    }

    public NSQConfig getConfig() {
        return this.config;
    }

    public void setConsumer(NSQConsumer nSQConsumer) {
        this.consumer = nSQConsumer;
    }
}
