package io.nats.client;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import io.nats.client.Nats;
import io.nats.client.Parser;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/nats/client/ConnectionImpl.class */
public class ConnectionImpl implements Connection {
    private String version;
    private static final String INBOX_PREFIX = "_INBOX.";
    protected static final String STALE_CONNECTION = "Stale Connection";
    protected static final String LANG_STRING = "java";
    protected static final int DEFAULT_BUF_SIZE = 65536;
    protected static final int DEFAULT_STREAM_BUF_SIZE = 65536;
    protected static final int FLUSH_CHAN_SIZE = 1;
    protected static final String _EMPTY_ = "";
    protected static final String _SPC_ = " ";
    protected static final String _OK_OP_ = "+OK";
    protected static final String _ERR_OP_ = "-ERR";
    protected static final String _MSG_OP_ = "MSG";
    protected static final String _PING_OP_ = "PING";
    protected static final String _PONG_OP_ = "PONG";
    protected static final String _INFO_OP_ = "INFO";
    protected static final String CONN_PROTO = "CONNECT %s\r\n";
    protected static final String PUB_PROTO = "PUB %s %s %d\r\n";
    protected static final String SUB_PROTO = "SUB %s%s %d\r\n";
    protected static final String UNSUB_PROTO = "UNSUB %d %s\r\n";
    protected static final String OK_PROTO = "+OK\r\n";
    private ConnectionImpl nc;
    private Options opts;
    private TcpConnectionFactory tcf;
    private int pout;
    private Statistics stats;
    private List<BlockingQueue<Boolean>> pongs;
    private static final int NUM_CORE_THREADS = 4;
    private ScheduledExecutorService exec;
    static final String EXEC_NAME = "jnats-exec";
    private ExecutorService subexec;
    static final String SUB_EXEC_NAME = "jnats-subscriptions";
    private ExecutorService cbexec;
    static final String CB_EXEC_NAME = "jnats-callbacks";
    static final String PINGTIMER = "pingtimer";
    static final String READLOOP = "readloop";
    static final String FLUSHER = "flusher";
    private static final int NUM_WATCHER_THREADS = 2;
    private BlockingQueue<Boolean> fch;
    protected static final String PING_PROTO = "PING\r\n";
    private static final byte[] pingProtoBytes = PING_PROTO.getBytes();
    private static final int pingProtoBytesLen = pingProtoBytes.length;
    protected static final String PONG_PROTO = "PONG\r\n";
    private static final byte[] pongProtoBytes = PONG_PROTO.getBytes();
    private static final int pongProtoBytesLen = pongProtoBytes.length;
    protected static final String _PUB_P_ = "PUB ";
    private static final byte[] pubPrimBytes = _PUB_P_.getBytes();
    private static final int pubPrimBytesLen = pubPrimBytes.length;
    protected static final String CRLF = "\r\n";
    private static final byte[] crlfProtoBytes = CRLF.getBytes();
    private static final int crlfProtoBytesLen = crlfProtoBytes.length;
    static final byte[] digits = {48, 49, 50, 51, 52, 53, 54, 55, 56, 57};
    private final Logger logger = LoggerFactory.getLogger(ConnectionImpl.class);
    private Nats.ConnState status = Nats.ConnState.DISCONNECTED;
    private long flushTimerInterval = 1;
    private TimeUnit flushTimerUnit = TimeUnit.MICROSECONDS;
    final Lock mu = new ReentrantLock();
    private final AtomicLong sidCounter = new AtomicLong(0);
    private URI url = null;
    private TcpConnection conn = null;
    private ByteBuffer pubProtoBuf = null;
    private OutputStream bw = null;
    private InputStream br = null;
    private ByteArrayOutputStream pending = null;
    private Map<Long, SubscriptionImpl> subs = new ConcurrentHashMap();
    private List<Srv> srvPool = null;
    private Map<String, URI> urls = null;
    private Exception lastEx = null;
    private ServerInfo info = null;
    private Parser parser = new Parser(this);
    private ScheduledFuture<?> ptmr = null;
    private final Map<String, Future<?>> tasks = new HashMap();
    private CountDownLatch socketWatchersStartLatch = new CountDownLatch(2);
    private CountDownLatch socketWatchersDoneLatch = null;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$ClientProto.class */
    public enum ClientProto {
        CLIENT_PROTO_ZERO(0),
        CLIENT_PROTO_INFO(ConnectionImpl.FLUSH_CHAN_SIZE);

        private final int value;

        ClientProto(int i) {
            this.value = i;
        }

        public int getValue() {
            return this.value;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$ConnectInfo.class */
    public static class ConnectInfo {

        @SerializedName("verbose")
        private final Boolean verbose;

        @SerializedName("pedantic")
        private final Boolean pedantic;

        @SerializedName("user")
        private final String user;

        @SerializedName("pass")
        private final String pass;

        @SerializedName("auth_token")
        private final String token;

        @SerializedName("tls_required")
        private final Boolean tlsRequired;

        @SerializedName("name")
        private final String name;

        @SerializedName("lang")
        private String lang;

        @SerializedName("version")
        private String version;

        @SerializedName("protocol")
        private final int protocol;
        private final transient Gson gson = new GsonBuilder().create();

        public ConnectInfo(boolean z, boolean z2, String str, String str2, String str3, boolean z3, String str4, String str5, String str6, ClientProto clientProto) {
            this.lang = ConnectionImpl.LANG_STRING;
            this.verbose = Boolean.valueOf(z);
            this.pedantic = Boolean.valueOf(z2);
            this.user = str;
            this.pass = str2;
            this.token = str3;
            this.tlsRequired = Boolean.valueOf(z3);
            this.name = str4;
            this.lang = str5;
            this.version = str6;
            this.protocol = clientProto.getValue();
        }

        public String toString() {
            return this.gson.toJson(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$Control.class */
    public static class Control {
        String op;
        String args;

        Control(String str) {
            this.op = null;
            this.args = null;
            if (str == null) {
                return;
            }
            String[] split = str.split(ConnectionImpl._SPC_, 2);
            switch (split.length) {
                case ConnectionImpl.FLUSH_CHAN_SIZE /* 1 */:
                    this.op = split[0].trim();
                    return;
                case 2:
                    this.op = split[0].trim();
                    this.args = split[ConnectionImpl.FLUSH_CHAN_SIZE].trim();
                    if (this.args.isEmpty()) {
                        this.args = null;
                        return;
                    }
                    return;
                default:
                    return;
            }
        }

        public String toString() {
            return "{op=" + this.op + ", args=" + this.args + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$PingTimerTask.class */
    public class PingTimerTask extends TimerTask {
        PingTimerTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            ConnectionImpl.this.mu.lock();
            try {
                if (!ConnectionImpl.this.connected()) {
                    ConnectionImpl.this.mu.unlock();
                    if (0 != 0) {
                        try {
                            ConnectionImpl.this.processOpError(new IOException(Nats.ERR_STALE_CONNECTION));
                            return;
                        } catch (InterruptedException e) {
                            ConnectionImpl.this.logger.warn("Interrupted");
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    return;
                }
                ConnectionImpl.this.setActualPingsOutstanding(ConnectionImpl.this.getActualPingsOutstanding() + ConnectionImpl.FLUSH_CHAN_SIZE);
                if (ConnectionImpl.this.getActualPingsOutstanding() > ConnectionImpl.this.opts.getMaxPingsOut()) {
                    ConnectionImpl.this.mu.unlock();
                    if (ConnectionImpl.FLUSH_CHAN_SIZE != 0) {
                        try {
                            ConnectionImpl.this.processOpError(new IOException(Nats.ERR_STALE_CONNECTION));
                            return;
                        } catch (InterruptedException e2) {
                            ConnectionImpl.this.logger.warn("Interrupted");
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    return;
                }
                ConnectionImpl.this.sendPing(null);
                ConnectionImpl.this.mu.unlock();
                if (0 != 0) {
                    try {
                        ConnectionImpl.this.processOpError(new IOException(Nats.ERR_STALE_CONNECTION));
                    } catch (InterruptedException e3) {
                        ConnectionImpl.this.logger.warn("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
            } catch (Throwable th) {
                ConnectionImpl.this.mu.unlock();
                if (0 != 0) {
                    try {
                        ConnectionImpl.this.processOpError(new IOException(Nats.ERR_STALE_CONNECTION));
                    } catch (InterruptedException e4) {
                        ConnectionImpl.this.logger.warn("Interrupted");
                        Thread.currentThread().interrupt();
                    }
                }
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/nats/client/ConnectionImpl$Srv.class */
    public static class Srv {
        URI url;
        int reconnects = 0;
        long lastAttemptNanos = 0;
        boolean implicit;

        Srv(URI uri, boolean z) {
            this.url = null;
            this.implicit = false;
            this.url = uri;
            this.implicit = z;
        }

        boolean isImplicit() {
            return this.implicit;
        }

        void updateLastAttempt() {
            this.lastAttemptNanos = System.nanoTime();
        }

        long timeSinceLastAttempt() {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.lastAttemptNanos);
        }

        public String toString() {
            return String.format("{url=%s, reconnects=%d, timeSinceLastAttempt=%dms}", this.url.toString(), Integer.valueOf(this.reconnects), Long.valueOf(timeSinceLastAttempt()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConnectionImpl(Options options) {
        this.version = null;
        this.nc = null;
        this.opts = null;
        this.tcf = null;
        this.stats = null;
        this.version = getProperties("jnats.properties").getProperty("client.version");
        this.nc = this;
        this.opts = options;
        this.stats = new Statistics();
        if (options.getFactory() != null) {
            this.tcf = options.getFactory();
        } else {
            this.tcf = new TcpConnectionFactory();
        }
    }

    ScheduledExecutorService createScheduler() {
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(NUM_CORE_THREADS, new NatsThreadFactory(EXEC_NAME));
        scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
        return scheduledThreadPoolExecutor;
    }

    ExecutorService createSubscriptionScheduler() {
        return Executors.newCachedThreadPool(new NatsThreadFactory(SUB_EXEC_NAME));
    }

    ExecutorService createCallbackScheduler() {
        return Executors.newSingleThreadExecutor(new NatsThreadFactory(CB_EXEC_NAME));
    }

    void setup() {
        this.exec = createScheduler();
        this.cbexec = createCallbackScheduler();
        this.subexec = createSubscriptionScheduler();
        this.fch = createFlushChannel();
        this.pongs = createPongs();
        this.subs.clear();
        buildPublishProtocolBuffer(1024);
    }

    Properties getProperties(InputStream inputStream) {
        Properties properties = new Properties();
        if (inputStream == null) {
            properties = null;
        } else {
            try {
                properties.load(inputStream);
            } catch (IOException e) {
                this.logger.warn("nats: error loading properties from InputStream", e);
                properties = null;
            }
        }
        return properties;
    }

    Properties getProperties(String str) {
        return getProperties(getClass().getClassLoader().getResourceAsStream(str));
    }

    private void buildPublishProtocolBuffer(int i) {
        this.pubProtoBuf = ByteBuffer.allocate(i);
        this.pubProtoBuf.put(pubPrimBytes, 0, pubPrimBytesLen);
        this.pubProtoBuf.mark();
    }

    void setupServerPool() {
        URI create = this.opts.getUrl() != null ? URI.create(this.opts.getUrl()) : null;
        List<URI> servers = this.opts.getServers();
        this.srvPool = new ArrayList();
        this.urls = new ConcurrentHashMap();
        if (servers != null) {
            Iterator<URI> it = servers.iterator();
            while (it.hasNext()) {
                addUrlToPool(it.next(), false);
            }
        }
        if (!this.opts.isNoRandomize()) {
            Collections.shuffle(this.srvPool, new Random(System.nanoTime()));
        }
        if (create != null) {
            this.srvPool.add(0, new Srv(create, false));
            this.urls.put(create.getAuthority(), create);
        }
        if (this.srvPool.isEmpty()) {
            addUrlToPool(Nats.DEFAULT_URL, false);
        }
        setUrl(this.srvPool.get(0).url);
    }

    void addUrlToPool(String str, boolean z) {
        URI create = URI.create(str);
        this.srvPool.add(new Srv(create, z));
        this.urls.put(create.getAuthority(), create);
    }

    void addUrlToPool(URI uri, boolean z) {
        this.srvPool.add(new Srv(uri, z));
        this.urls.put(uri.getAuthority(), uri);
    }

    Srv currentServer() {
        Srv srv = null;
        Iterator<Srv> it = this.srvPool.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Srv next = it.next();
            if (next.url.equals(getUrl())) {
                srv = next;
                break;
            }
        }
        return srv;
    }

    Srv selectNextServer() throws IOException {
        Srv currentServer = currentServer();
        if (currentServer == null) {
            throw new IOException(Nats.ERR_NO_SERVERS);
        }
        this.srvPool.remove(currentServer);
        int maxReconnect = this.opts.getMaxReconnect();
        if (maxReconnect < 0 || currentServer.reconnects < maxReconnect) {
            this.srvPool.add(currentServer);
        }
        if (!this.srvPool.isEmpty()) {
            return this.srvPool.get(0);
        }
        setUrl(null);
        throw new IOException(Nats.ERR_NO_SERVERS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Connection connect() throws IOException {
        IOException iOException = null;
        setupServerPool();
        this.mu.lock();
        try {
            for (Srv srv : this.srvPool) {
                setUrl(srv.url);
                try {
                    this.logger.debug("Connecting to {}", getUrl());
                    createConn();
                    this.logger.debug("Connected to {}", getUrl());
                    setup();
                } catch (IOException e) {
                    if (e.getMessage() != null && e.getMessage().contains("Connection refused")) {
                        setLastError(null);
                    }
                }
                try {
                    processConnectInit();
                    srv.reconnects = 0;
                    iOException = null;
                    break;
                } catch (IOException e2) {
                    iOException = e2;
                    this.mu.unlock();
                    close(Nats.ConnState.DISCONNECTED, false);
                    this.mu.lock();
                    setUrl(null);
                } catch (InterruptedException e3) {
                    iOException = new IOException(e3);
                    this.mu.unlock();
                    close(Nats.ConnState.DISCONNECTED, false);
                    this.mu.lock();
                    setUrl(null);
                }
            }
            if (iOException == null && this.status != Nats.ConnState.CONNECTED) {
                iOException = new IOException(Nats.ERR_NO_SERVERS);
            }
            if (iOException != null) {
                throw iOException;
            }
            this.cbexec = createCallbackScheduler();
            this.mu.unlock();
            return this;
        } catch (Throwable th) {
            this.mu.unlock();
            throw th;
        }
    }

    void createConn() throws IOException {
        if (this.opts.getConnectionTimeout() < 0) {
            this.logger.warn("{}: {}", Nats.ERR_BAD_TIMEOUT, Integer.valueOf(this.opts.getConnectionTimeout()));
            throw new IOException(Nats.ERR_BAD_TIMEOUT);
        }
        Srv currentServer = currentServer();
        if (currentServer == null) {
            throw new IOException(Nats.ERR_NO_SERVERS);
        }
        currentServer.updateLastAttempt();
        try {
            this.logger.debug("Opening {}", currentServer.url);
            this.conn = this.tcf.createConnection();
            this.conn.open(currentServer.url.toString(), this.opts.getConnectionTimeout());
            this.logger.trace("Opened {} as TcpConnection ({})", currentServer.url, this.conn);
            if (this.pending != null && this.bw != null) {
                try {
                    this.bw.flush();
                } catch (IOException e) {
                    this.logger.warn(Nats.ERR_TCP_FLUSH_FAILED);
                }
            }
            this.bw = this.conn.getOutputStream(65536);
            this.br = this.conn.getInputStream(65536);
        } catch (IOException e2) {
            this.logger.debug("Couldn't establish connection to {}: {}", currentServer.url, e2.getMessage());
            throw e2;
        }
    }

    BlockingQueue<Message> createMsgChannel() {
        return createMsgChannel(Integer.MAX_VALUE);
    }

    BlockingQueue<Message> createMsgChannel(int i) {
        int i2 = i;
        if (i2 <= 0) {
            i2 = FLUSH_CHAN_SIZE;
        }
        return new LinkedBlockingQueue(i2);
    }

    BlockingQueue<Boolean> createBooleanChannel() {
        return new LinkedBlockingQueue();
    }

    BlockingQueue<Boolean> createBooleanChannel(int i) {
        int i2 = i;
        if (i2 <= 0) {
            i2 = FLUSH_CHAN_SIZE;
        }
        return new LinkedBlockingQueue(i2);
    }

    BlockingQueue<Boolean> createFlushChannel() {
        return new LinkedBlockingQueue(FLUSH_CHAN_SIZE);
    }

    void clearPendingFlushCalls() {
        if (this.pongs == null) {
            return;
        }
        for (BlockingQueue<Boolean> blockingQueue : this.pongs) {
            if (blockingQueue != null) {
                blockingQueue.clear();
                blockingQueue.add(false);
            }
        }
        this.pongs.clear();
        this.pongs = null;
    }

    @Override // io.nats.client.AbstractConnection, java.lang.AutoCloseable
    public void close() {
        close(Nats.ConnState.CLOSED, true);
    }

    private void close(Nats.ConnState connState, boolean z) {
        this.logger.debug("close({}, {})", connState, String.valueOf(z));
        this.mu.lock();
        try {
            if (closed()) {
                this.status = connState;
                return;
            }
            this.status = Nats.ConnState.CLOSED;
            kickFlusher();
            this.mu.unlock();
            this.mu.lock();
            try {
                clearPendingFlushCalls();
                if (this.conn != null) {
                    try {
                        if (this.bw != null) {
                            this.bw.flush();
                        }
                    } catch (IOException e) {
                    }
                }
                Iterator<Map.Entry<Long, SubscriptionImpl>> it = this.subs.entrySet().iterator();
                while (it.hasNext()) {
                    SubscriptionImpl value = it.next().getValue();
                    value.lock();
                    try {
                        value.closeChannel();
                        value.closed = true;
                        value.connClosed = true;
                        value.close();
                        value.unlock();
                    } catch (Throwable th) {
                        value.unlock();
                        throw th;
                    }
                }
                this.subs.clear();
                if (z) {
                    if (this.opts.getDisconnectedCallback() != null && this.conn != null) {
                        this.cbexec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.1
                            @Override // java.lang.Runnable
                            public void run() {
                                ConnectionImpl.this.opts.getDisconnectedCallback().onDisconnect(new ConnectionEvent(this));
                                ConnectionImpl.this.logger.trace("executed DisconnectedCB");
                            }
                        });
                    }
                    if (this.opts.getClosedCallback() != null) {
                        this.cbexec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.2
                            @Override // java.lang.Runnable
                            public void run() {
                                ConnectionImpl.this.opts.getClosedCallback().onClose(new ConnectionEvent(this));
                                ConnectionImpl.this.logger.trace("executed ClosedCB");
                            }
                        });
                    }
                    if (this.cbexec != null) {
                        this.cbexec.shutdown();
                    }
                }
                this.status = connState;
                if (this.conn != null) {
                    this.conn.close();
                }
                if (this.exec != null) {
                    shutdownAndAwaitTermination(this.exec, EXEC_NAME);
                }
                if (this.subexec != null) {
                    shutdownAndAwaitTermination(this.subexec, SUB_EXEC_NAME);
                }
                this.mu.unlock();
            } finally {
            }
        } finally {
            this.mu.unlock();
        }
    }

    void shutdownAndAwaitTermination(ExecutorService executorService, String str) {
        try {
            executorService.shutdownNow();
            if (!executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                this.logger.error("{} did not terminate", str);
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }

    void processConnectInit() throws IOException, InterruptedException {
        this.status = Nats.ConnState.CONNECTING;
        processExpectedInfo();
        sendConnect();
        setActualPingsOutstanding(0);
        spinUpSocketWatchers();
    }

    void checkForSecure() throws IOException {
        if (this.opts.isSecure() && !this.info.isTlsRequired()) {
            throw new IOException(Nats.ERR_SECURE_CONN_WANTED);
        }
        if (this.info.isTlsRequired() && !this.opts.isSecure()) {
            throw new IOException(Nats.ERR_SECURE_CONN_REQUIRED);
        }
        if (this.opts.isSecure() || "tls".equals(getUrl().getScheme())) {
            makeTlsConn();
        }
    }

    void makeTlsConn() throws IOException {
        this.conn.setTlsDebug(this.opts.isTlsDebug());
        this.conn.makeTls(this.opts.getSslContext());
        this.bw = this.conn.getOutputStream(65536);
        this.br = this.conn.getInputStream(65536);
    }

    void processExpectedInfo() throws IOException, InterruptedException {
        try {
            Control readOp = readOp();
            if (!readOp.op.equals(_INFO_OP_)) {
                throw new IOException(Nats.ERR_NO_INFO_RECEIVED);
            }
            processInfo(readOp.args);
            checkForSecure();
        } catch (IOException e) {
            processOpError(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processPing() {
        try {
            sendProto(pongProtoBytes, pongProtoBytesLen);
        } catch (IOException e) {
            setLastError(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processPong() throws InterruptedException {
        BlockingQueue<Boolean> blockingQueue = null;
        this.mu.lockInterruptibly();
        try {
            if (this.pongs != null && this.pongs.size() > 0) {
                blockingQueue = this.pongs.get(0);
                this.pongs.remove(0);
            }
            setActualPingsOutstanding(0);
            if (blockingQueue != null) {
                blockingQueue.add(true);
            }
        } finally {
            this.mu.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processOk() {
    }

    void processInfo(String str) {
        if (str == null || str.isEmpty()) {
            return;
        }
        setConnectedServerInfo(ServerInfo.createFromWire(str));
        boolean z = false;
        if (this.info.getConnectUrls() != null) {
            String[] connectUrls = this.info.getConnectUrls();
            int length = connectUrls.length;
            for (int i = 0; i < length; i += FLUSH_CHAN_SIZE) {
                String str2 = connectUrls[i];
                if (!this.urls.containsKey(str2)) {
                    addUrlToPool(String.format("nats://%s", str2), true);
                    z = FLUSH_CHAN_SIZE;
                }
            }
            if (!z || this.opts.isNoRandomize()) {
                return;
            }
            Collections.shuffle(this.srvPool);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processAsyncInfo(byte[] bArr, int i, int i2) {
        this.mu.lock();
        try {
            processInfo(new String(bArr, i, i2));
            this.mu.unlock();
        } catch (Throwable th) {
            this.mu.unlock();
            throw th;
        }
    }

    void processOpError(Exception exc) throws InterruptedException {
        this.mu.lockInterruptibly();
        try {
            if (connecting() || closed() || reconnecting()) {
                return;
            }
            this.logger.debug("Connection terminated: {}", exc.getMessage());
            if (this.opts.isReconnectAllowed() && this.status == Nats.ConnState.CONNECTED) {
                this.status = Nats.ConnState.RECONNECTING;
                if (this.ptmr != null) {
                    this.ptmr.cancel(true);
                    this.tasks.remove(this.ptmr);
                }
                if (this.conn != null) {
                    try {
                        this.bw.flush();
                    } catch (IOException e) {
                        this.logger.warn("I/O error during flush");
                    }
                    this.conn.close();
                }
                if (this.fch != null && !this.fch.offer(false)) {
                    this.logger.debug("Coudn't shut down flusher following connection error");
                }
                setPending(new ByteArrayOutputStream(this.opts.getReconnectBufSize()));
                setOutputStream(getPending());
                if (this.exec.isShutdown()) {
                    this.exec = createScheduler();
                }
                this.exec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.3
                    @Override // java.lang.Runnable
                    public void run() {
                        Thread.currentThread().setName("reconnect");
                        try {
                            ConnectionImpl.this.doReconnect();
                        } catch (InterruptedException e2) {
                            ConnectionImpl.this.logger.warn("nats: interrupted while reonnecting");
                        }
                    }
                });
                if (this.cbexec.isShutdown()) {
                    this.cbexec = createCallbackScheduler();
                }
            } else {
                processDisconnect();
                setLastError(exc);
                close();
            }
        } finally {
            this.mu.unlock();
        }
    }

    protected void processDisconnect() {
        this.logger.debug("processDisconnect()");
        this.status = Nats.ConnState.DISCONNECTED;
    }

    @Override // io.nats.client.AbstractConnection
    public boolean isReconnecting() {
        this.mu.lock();
        try {
            return reconnecting();
        } finally {
            this.mu.unlock();
        }
    }

    boolean reconnecting() {
        return this.status == Nats.ConnState.RECONNECTING;
    }

    @Override // io.nats.client.AbstractConnection
    public boolean isConnected() {
        this.mu.lock();
        try {
            return connected();
        } finally {
            this.mu.unlock();
        }
    }

    boolean connected() {
        return this.status == Nats.ConnState.CONNECTED;
    }

    @Override // io.nats.client.AbstractConnection
    public boolean isClosed() {
        this.mu.lock();
        try {
            return closed();
        } finally {
            this.mu.unlock();
        }
    }

    boolean closed() {
        return this.status == Nats.ConnState.CLOSED;
    }

    void flushReconnectPendingItems() {
        if (this.pending == null) {
            return;
        }
        if (this.pending.size() > 0) {
            try {
                this.bw.write(this.pending.toByteArray(), 0, this.pending.size());
                this.bw.flush();
            } catch (IOException e) {
                this.logger.error("Error flushing pending items", e);
            }
        }
        this.pending = null;
    }

    /* JADX WARN: Code restructure failed: missing block: B:34:0x018a, code lost:
    
        setPending(null);
        r5.status = io.nats.client.Nats.ConnState.CONNECTED;
     */
    /* JADX WARN: Code restructure failed: missing block: B:35:0x019d, code lost:
    
        if (r5.opts.getReconnectedCallback() == null) goto L35;
     */
    /* JADX WARN: Code restructure failed: missing block: B:36:0x01a0, code lost:
    
        r5.logger.trace("Scheduling reconnectedCb from doReconnect()");
        r5.cbexec.submit(new io.nats.client.ConnectionImpl.AnonymousClass5(r5));
        r5.logger.trace("Scheduled reconnectedCb from doReconnect()");
     */
    /* JADX WARN: Code restructure failed: missing block: B:37:0x01ca, code lost:
    
        r5.mu.unlock();
     */
    /* JADX WARN: Code restructure failed: missing block: B:39:0x01d3, code lost:
    
        flush();
     */
    /* JADX WARN: Code restructure failed: missing block: B:42:0x01fd, code lost:
    
        return;
     */
    /* JADX WARN: Code restructure failed: missing block: B:45:0x01da, code lost:
    
        r11 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:47:0x01e3, code lost:
    
        if (r5.status == io.nats.client.Nats.ConnState.CONNECTED) goto L40;
     */
    /* JADX WARN: Code restructure failed: missing block: B:48:0x01e6, code lost:
    
        r5.logger.warn("Error flushing connection", r11);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    void doReconnect() throws java.lang.InterruptedException {
        /*
            Method dump skipped, instructions count: 573
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.nats.client.ConnectionImpl.doReconnect():void");
    }

    boolean connecting() {
        return this.status == Nats.ConnState.CONNECTING;
    }

    Nats.ConnState status() {
        return this.status;
    }

    static String normalizeErr(String str) {
        String str2 = str;
        if (str2 != null) {
            str2 = str2.replaceFirst("-ERR\\s+", _EMPTY_).toLowerCase().replaceAll("^'|'$", _EMPTY_);
        }
        return str2;
    }

    static String normalizeErr(ByteBuffer byteBuffer) {
        String bufToString = Parser.bufToString(byteBuffer);
        if (bufToString != null) {
            bufToString = bufToString.trim();
        }
        return normalizeErr(bufToString);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processErr(ByteBuffer byteBuffer) throws InterruptedException {
        String normalizeErr = normalizeErr(byteBuffer);
        if (STALE_CONNECTION.equalsIgnoreCase(normalizeErr)) {
            processOpError(new IOException(Nats.ERR_STALE_CONNECTION));
            return;
        }
        if (normalizeErr.startsWith("permissions violation")) {
            processPermissionsViolation(normalizeErr);
            return;
        }
        NATSException nATSException = new NATSException("nats: " + normalizeErr);
        nATSException.setConnection(this);
        this.mu.lock();
        try {
            setLastError(nATSException);
            this.mu.unlock();
            close();
        } catch (Throwable th) {
            this.mu.unlock();
            throw th;
        }
    }

    protected void sendConnect() throws IOException {
        this.bw.write(connectProto().getBytes());
        this.bw.flush();
        if (this.opts.isVerbose()) {
            String readLine = readLine();
            if (!_OK_OP_.equals(readLine)) {
                throw new IOException(String.format("nats: expected '%s', got '%s'", _OK_OP_, readLine));
            }
        }
        this.bw.write(pingProtoBytes, 0, pingProtoBytesLen);
        this.bw.flush();
        try {
            String readLine2 = readLine();
            if (PONG_PROTO.trim().equals(readLine2)) {
                this.status = Nats.ConnState.CONNECTED;
            } else {
                if (!readLine2.startsWith(_ERR_OP_)) {
                    throw new IOException(String.format("nats: expected '%s', got '%s'", _PONG_OP_, readLine2));
                }
                throw new IOException("nats: " + normalizeErr(readLine2));
            }
        } catch (IOException e) {
            throw new IOException(Nats.ERR_CONNECTION_READ, e);
        }
    }

    String readLine() throws IOException {
        String readLine = this.conn.getBufferedReader().readLine();
        if (readLine == null) {
            throw new EOFException(Nats.ERR_CONNECTION_CLOSED);
        }
        return readLine;
    }

    void sendProto(byte[] bArr, int i) throws IOException {
        this.mu.lock();
        try {
            this.bw.write(bArr, 0, i);
            kickFlusher();
        } finally {
            this.mu.unlock();
        }
    }

    String connectProto() {
        String userInfo = getUrl().getUserInfo();
        String str = null;
        String str2 = null;
        String str3 = null;
        if (userInfo != null) {
            String[] split = userInfo.split(":");
            if (split[0].length() > 0) {
                switch (split.length) {
                    case FLUSH_CHAN_SIZE /* 1 */:
                        str3 = split[0];
                        break;
                    case 2:
                        str = split[0];
                        str2 = split[FLUSH_CHAN_SIZE];
                        break;
                }
            }
        } else {
            str = this.opts.getUsername();
            str2 = this.opts.getPassword();
            str3 = this.opts.getToken();
        }
        return String.format(CONN_PROTO, new ConnectInfo(this.opts.isVerbose(), this.opts.isPedantic(), str, str2, str3, this.opts.isSecure(), this.opts.getConnectionName(), LANG_STRING, this.version, ClientProto.CLIENT_PROTO_INFO));
    }

    Control readOp() throws IOException {
        return new Control(readLine());
    }

    private void waitForExits() throws InterruptedException {
        kickFlusher();
        if (this.socketWatchersDoneLatch != null) {
            this.logger.debug("nats: waiting for watcher threads to exit");
            this.socketWatchersDoneLatch.await();
        }
    }

    protected void spinUpSocketWatchers() throws InterruptedException {
        waitForExits();
        this.socketWatchersDoneLatch = new CountDownLatch(2);
        this.socketWatchersStartLatch = new CountDownLatch(2);
        this.tasks.put(READLOOP, this.exec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.6
            @Override // java.lang.Runnable
            public void run() {
                Thread.currentThread().setName(ConnectionImpl.READLOOP);
                ConnectionImpl.this.logger.debug("{} starting...", ConnectionImpl.READLOOP);
                ConnectionImpl.this.socketWatchersStartLatch.countDown();
                try {
                    ConnectionImpl.this.socketWatchersStartLatch.await();
                    ConnectionImpl.this.readLoop();
                } catch (InterruptedException e) {
                    ConnectionImpl.this.logger.debug("{} interrupted", ConnectionImpl.READLOOP);
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    ConnectionImpl.this.logger.error("Unexpected exception in {}", ConnectionImpl.READLOOP, e2);
                } finally {
                    ConnectionImpl.this.socketWatchersDoneLatch.countDown();
                }
                ConnectionImpl.this.logger.debug("{} exiting", ConnectionImpl.READLOOP);
            }
        }));
        this.tasks.put(FLUSHER, this.exec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.7
            @Override // java.lang.Runnable
            public void run() {
                Thread.currentThread().setName(ConnectionImpl.FLUSHER);
                ConnectionImpl.this.logger.debug("{} starting...", ConnectionImpl.FLUSHER);
                ConnectionImpl.this.socketWatchersStartLatch.countDown();
                try {
                    ConnectionImpl.this.socketWatchersStartLatch.await();
                    ConnectionImpl.this.flusher();
                } catch (InterruptedException e) {
                    ConnectionImpl.this.logger.debug("{} interrupted", ConnectionImpl.FLUSHER);
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    ConnectionImpl.this.logger.error("Unexpected exception in {}", ConnectionImpl.FLUSHER, e2);
                } finally {
                    ConnectionImpl.this.socketWatchersDoneLatch.countDown();
                }
                ConnectionImpl.this.logger.debug("{} exiting", ConnectionImpl.FLUSHER);
            }
        }));
        this.socketWatchersStartLatch.countDown();
        resetPingTimer();
    }

    void readLoop() throws InterruptedException {
        this.mu.lockInterruptibly();
        try {
            Parser parser = this.parser;
            if (parser.ps == null) {
                parser.ps = new Parser.ParseState();
            }
            this.mu.unlock();
            byte[] bArr = new byte[65536];
            while (!Thread.currentThread().isInterrupted()) {
                this.mu.lockInterruptibly();
                try {
                    boolean z = closed() || reconnecting();
                    if (z) {
                        parser.ps = new Parser.ParseState();
                    }
                    TcpConnection tcpConnection = this.conn;
                    this.mu.unlock();
                    if (z || tcpConnection == null) {
                        break;
                    }
                    try {
                        int read = this.br.read(bArr);
                        if (read == -1) {
                            throw new IOException(Nats.ERR_STALE_CONNECTION);
                        }
                        parser.parse(bArr, read);
                    } catch (IOException | ParseException e) {
                        this.logger.debug("Exception in readloop(): '{}' (state: {})", e.getMessage(), this.status);
                        if (this.status != Nats.ConnState.CLOSED) {
                            processOpError(e);
                        }
                    }
                } finally {
                }
            }
            this.mu.lockInterruptibly();
            try {
                parser.ps = null;
                this.mu.unlock();
            } finally {
                this.mu.unlock();
            }
        } finally {
        }
    }

    void waitForMsgs(AsyncSubscriptionImpl asyncSubscriptionImpl) throws InterruptedException {
        long j = 0;
        while (true) {
            asyncSubscriptionImpl.lock();
            try {
                BlockingQueue<Message> channel = asyncSubscriptionImpl.getChannel();
                while (channel.size() == 0 && !asyncSubscriptionImpl.isClosed()) {
                    asyncSubscriptionImpl.pCond.await();
                }
                Message poll = channel.poll();
                if (poll != null) {
                    asyncSubscriptionImpl.pMsgs -= FLUSH_CHAN_SIZE;
                    asyncSubscriptionImpl.pBytes -= poll.getData() == null ? 0 : poll.getData().length;
                }
                MessageHandler messageHandler = asyncSubscriptionImpl.getMessageHandler();
                long j2 = asyncSubscriptionImpl.max;
                boolean isClosed = asyncSubscriptionImpl.isClosed();
                if (!isClosed) {
                    asyncSubscriptionImpl.delivered++;
                    j = asyncSubscriptionImpl.delivered;
                }
                if (isClosed) {
                    return;
                }
                if (poll != null && (j2 <= 0 || j <= j2)) {
                    messageHandler.onMessage(poll);
                }
                if (j2 > 0 && j >= j2) {
                    this.mu.lock();
                    try {
                        removeSub(asyncSubscriptionImpl);
                        this.mu.unlock();
                        return;
                    } catch (Throwable th) {
                        this.mu.unlock();
                        throw th;
                    }
                }
            } finally {
                asyncSubscriptionImpl.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Finally extract failed */
    public void processMsg(byte[] bArr, int i, int i2) {
        this.mu.lock();
        try {
            this.stats.incrementInMsgs();
            this.stats.incrementInBytes(i2);
            SubscriptionImpl subscriptionImpl = this.subs.get(Long.valueOf(this.parser.ps.ma.sid));
            if (subscriptionImpl == null) {
                return;
            }
            Message message = new Message(this.parser.ps.ma, subscriptionImpl, bArr, i, i2);
            subscriptionImpl.lock();
            try {
                subscriptionImpl.pMsgs += FLUSH_CHAN_SIZE;
                if (subscriptionImpl.pMsgs > subscriptionImpl.pMsgsMax) {
                    subscriptionImpl.pMsgsMax = subscriptionImpl.pMsgs;
                }
                subscriptionImpl.pBytes += message.getData() == null ? 0 : message.getData().length;
                if (subscriptionImpl.pBytes > subscriptionImpl.pBytesMax) {
                    subscriptionImpl.pBytesMax = subscriptionImpl.pBytes;
                }
                if ((subscriptionImpl.pMsgsLimit > 0 && subscriptionImpl.pMsgs > subscriptionImpl.pMsgsLimit) || (subscriptionImpl.pBytesLimit > 0 && subscriptionImpl.pBytes > subscriptionImpl.pBytesLimit)) {
                    handleSlowConsumer(subscriptionImpl, message);
                } else if (subscriptionImpl.getChannel() != null) {
                    if (subscriptionImpl.getChannel().add(message)) {
                        subscriptionImpl.pCond.signal();
                        subscriptionImpl.setSlowConsumer(false);
                    } else {
                        handleSlowConsumer(subscriptionImpl, message);
                    }
                }
                subscriptionImpl.unlock();
                this.mu.unlock();
            } catch (Throwable th) {
                subscriptionImpl.unlock();
                throw th;
            }
        } finally {
            this.mu.unlock();
        }
    }

    void handleSlowConsumer(SubscriptionImpl subscriptionImpl, Message message) {
        subscriptionImpl.dropped += FLUSH_CHAN_SIZE;
        processSlowConsumer(subscriptionImpl);
        subscriptionImpl.pMsgs -= FLUSH_CHAN_SIZE;
        if (message.getData() != null) {
            subscriptionImpl.pBytes -= message.getData().length;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeSub(SubscriptionImpl subscriptionImpl) {
        this.subs.remove(Long.valueOf(subscriptionImpl.getSid()));
        subscriptionImpl.lock();
        try {
            if (subscriptionImpl.getChannel() != null) {
                subscriptionImpl.mch.clear();
                subscriptionImpl.mch = null;
            }
            subscriptionImpl.setConnection(null);
            subscriptionImpl.closed = true;
        } finally {
            subscriptionImpl.unlock();
        }
    }

    void processSlowConsumer(SubscriptionImpl subscriptionImpl) {
        IOException iOException = new IOException(Nats.ERR_SLOW_CONSUMER);
        final NATSException nATSException = new NATSException(iOException, this, subscriptionImpl);
        setLastError(iOException);
        if (this.opts.getExceptionHandler() != null && !subscriptionImpl.isSlowConsumer()) {
            this.cbexec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.8
                @Override // java.lang.Runnable
                public void run() {
                    ConnectionImpl.this.opts.getExceptionHandler().onException(nATSException);
                }
            });
        }
        subscriptionImpl.setSlowConsumer(true);
    }

    void processPermissionsViolation(String str) {
        IOException iOException = new IOException("nats: " + str);
        final NATSException nATSException = new NATSException(iOException);
        nATSException.setConnection(this);
        setLastError(iOException);
        if (this.opts.getExceptionHandler() != null) {
            this.cbexec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.9
                @Override // java.lang.Runnable
                public void run() {
                    ConnectionImpl.this.opts.getExceptionHandler().onException(nATSException);
                }
            });
        }
    }

    boolean removeFlushEntry(BlockingQueue<Boolean> blockingQueue) throws InterruptedException {
        this.mu.lockInterruptibly();
        try {
            if (this.pongs == null) {
                return false;
            }
            for (BlockingQueue<Boolean> blockingQueue2 : this.pongs) {
                if (blockingQueue2.equals(blockingQueue)) {
                    blockingQueue2.clear();
                    this.pongs.remove(blockingQueue2);
                    this.mu.unlock();
                    return true;
                }
            }
            this.mu.unlock();
            return false;
        } finally {
            this.mu.unlock();
        }
    }

    void sendPing(BlockingQueue<Boolean> blockingQueue) {
        if (this.pongs == null) {
            this.pongs = createPongs();
        }
        if (blockingQueue != null) {
            this.pongs.add(blockingQueue);
        }
        try {
            this.bw.write(pingProtoBytes, 0, pingProtoBytesLen);
            this.bw.flush();
        } catch (IOException e) {
            setLastError(e);
        }
    }

    List<BlockingQueue<Boolean>> createPongs() {
        return new ArrayList();
    }

    ScheduledFuture<?> createPingTimer() {
        return this.exec.scheduleWithFixedDelay(new PingTimerTask(), this.opts.getPingInterval(), this.opts.getPingInterval(), TimeUnit.MILLISECONDS);
    }

    void resetPingTimer() {
        this.mu.lock();
        try {
            if (this.ptmr != null) {
                this.ptmr.cancel(true);
                this.tasks.remove(this.ptmr);
            }
            if (this.opts.getPingInterval() > 0) {
                this.ptmr = createPingTimer();
                this.tasks.put(PINGTIMER, this.ptmr);
            }
        } finally {
            this.mu.unlock();
        }
    }

    void writeUnsubProto(SubscriptionImpl subscriptionImpl, long j) throws IOException {
        Object[] objArr = new Object[2];
        objArr[0] = Long.valueOf(subscriptionImpl.getSid());
        objArr[FLUSH_CHAN_SIZE] = j > 0 ? Long.toString(j) : _EMPTY_;
        this.bw.write(String.format(UNSUB_PROTO, objArr).replaceAll(" +\r\n", CRLF).getBytes());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unsubscribe(SubscriptionImpl subscriptionImpl, int i) throws IOException {
        unsubscribe(subscriptionImpl, i);
    }

    protected void unsubscribe(SubscriptionImpl subscriptionImpl, long j) throws IOException {
        this.mu.lock();
        try {
            if (isClosed()) {
                throw new IllegalStateException(Nats.ERR_CONNECTION_CLOSED);
            }
            SubscriptionImpl subscriptionImpl2 = this.subs.get(Long.valueOf(subscriptionImpl.getSid()));
            if (subscriptionImpl2 == null) {
                return;
            }
            if (j > 0) {
                subscriptionImpl2.setMax(j);
            } else {
                removeSub(subscriptionImpl2);
            }
            if (!reconnecting()) {
                writeUnsubProto(subscriptionImpl2, j);
            }
            kickFlusher();
            this.mu.unlock();
        } finally {
            this.mu.unlock();
        }
    }

    protected void kickFlusher() {
        if (this.bw == null || this.fch == null) {
            return;
        }
        this.fch.offer(true);
    }

    protected void flusher() throws InterruptedException {
        this.mu.lockInterruptibly();
        OutputStream outputStream = this.bw;
        TcpConnection tcpConnection = this.conn;
        BlockingQueue<Boolean> blockingQueue = this.fch;
        this.mu.unlock();
        if (tcpConnection == null || outputStream == null) {
            return;
        }
        while (blockingQueue.take().booleanValue()) {
            this.mu.lockInterruptibly();
            try {
                try {
                } catch (IOException e) {
                    this.logger.debug("I/O exception encountered during flush");
                    setLastError(e);
                    this.mu.unlock();
                }
                if (!connected() || connecting() || outputStream != this.bw || tcpConnection != this.conn) {
                    this.mu.unlock();
                    return;
                }
                outputStream.flush();
                this.stats.incrementFlushes();
                this.mu.unlock();
                Thread.sleep(this.flushTimerUnit.toMillis(this.flushTimerInterval));
            } catch (Throwable th) {
                this.mu.unlock();
                throw th;
            }
        }
        this.logger.debug("flusher id:{} exiting", Long.valueOf(Thread.currentThread().getId()));
    }

    @Override // io.nats.client.AbstractConnection
    public void flush(int i) throws IOException, InterruptedException {
        if (i <= 0) {
            throw new IllegalArgumentException(Nats.ERR_BAD_TIMEOUT);
        }
        this.mu.lockInterruptibly();
        try {
            if (closed()) {
                throw new IllegalStateException(Nats.ERR_CONNECTION_CLOSED);
            }
            BlockingQueue<Boolean> createBooleanChannel = createBooleanChannel(FLUSH_CHAN_SIZE);
            sendPing(createBooleanChannel);
            Boolean poll = createBooleanChannel.poll(i, TimeUnit.MILLISECONDS);
            if (poll == null) {
                removeFlushEntry(createBooleanChannel);
                throw new IOException(Nats.ERR_TIMEOUT);
            }
            if (!poll.booleanValue()) {
                throw new IllegalStateException(Nats.ERR_CONNECTION_CLOSED);
            }
            createBooleanChannel.clear();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public void flush() throws IOException, InterruptedException {
        flush(60000);
    }

    void resendSubscriptions() {
        long j = 0;
        Iterator<Map.Entry<Long, SubscriptionImpl>> it = this.subs.entrySet().iterator();
        while (it.hasNext()) {
            SubscriptionImpl value = it.next().getValue();
            value.lock();
            try {
                if (value.max > 0) {
                    if (value.delivered < value.max) {
                        j = value.max - value.delivered;
                    }
                    if (j == 0) {
                        try {
                            unsubscribe(value, 0);
                        } catch (Exception e) {
                        }
                    }
                }
                value.unlock();
                sendSubscriptionMessage(value);
                if (j > 0) {
                    try {
                        writeUnsubProto(value, j);
                    } catch (Exception e2) {
                        this.logger.debug("nats: exception while writing UNSUB proto");
                    }
                }
            } finally {
                value.unlock();
            }
        }
    }

    SubscriptionImpl subscribe(String str, String str2, MessageHandler messageHandler, BlockingQueue<Message> blockingQueue) {
        final SubscriptionImpl syncSubscriptionImpl;
        this.mu.lock();
        try {
            if (closed()) {
                throw new IllegalStateException(Nats.ERR_CONNECTION_CLOSED);
            }
            if (messageHandler == null && blockingQueue == null) {
                throw new IllegalArgumentException(Nats.ERR_BAD_SUBSCRIPTION);
            }
            if (messageHandler != null) {
                syncSubscriptionImpl = new AsyncSubscriptionImpl(this, str, str2, messageHandler);
                this.logger.debug("Starting subscription for subject '{}'", str);
                this.subexec.submit(new Runnable() { // from class: io.nats.client.ConnectionImpl.10
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            ConnectionImpl.this.waitForMsgs((AsyncSubscriptionImpl) syncSubscriptionImpl);
                        } catch (InterruptedException e) {
                            ConnectionImpl.this.logger.debug("Interrupted in waitForMsgs");
                            Thread.currentThread().interrupt();
                        }
                    }
                });
            } else {
                syncSubscriptionImpl = new SyncSubscriptionImpl(this, str, str2);
                syncSubscriptionImpl.setChannel(blockingQueue);
            }
            addSubscription(syncSubscriptionImpl);
            if (!reconnecting()) {
                sendSubscriptionMessage(syncSubscriptionImpl);
            }
            kickFlusher();
            SubscriptionImpl subscriptionImpl = syncSubscriptionImpl;
            this.mu.unlock();
            return subscriptionImpl;
        } catch (Throwable th) {
            this.mu.unlock();
            throw th;
        }
    }

    @Override // io.nats.client.AbstractConnection
    public SyncSubscription subscribe(String str) {
        return subscribeSync(str, null);
    }

    @Override // io.nats.client.AbstractConnection
    public SyncSubscription subscribe(String str, String str2) {
        return subscribeSync(str, str2);
    }

    @Override // io.nats.client.AbstractConnection
    public AsyncSubscription subscribe(String str, MessageHandler messageHandler) {
        return subscribe(str, null, messageHandler);
    }

    @Override // io.nats.client.AbstractConnection
    public AsyncSubscription subscribe(String str, String str2, MessageHandler messageHandler) {
        return (AsyncSubscriptionImpl) subscribe(str, str2, messageHandler, null);
    }

    @Override // io.nats.client.AbstractConnection
    @Deprecated
    public AsyncSubscription subscribeAsync(String str, String str2, MessageHandler messageHandler) {
        return (AsyncSubscriptionImpl) subscribe(str, str2, messageHandler, null);
    }

    @Override // io.nats.client.AbstractConnection
    @Deprecated
    public AsyncSubscription subscribeAsync(String str, MessageHandler messageHandler) {
        return subscribe(str, null, messageHandler);
    }

    private void addSubscription(SubscriptionImpl subscriptionImpl) {
        subscriptionImpl.setSid(this.sidCounter.incrementAndGet());
        this.subs.put(Long.valueOf(subscriptionImpl.getSid()), subscriptionImpl);
    }

    @Override // io.nats.client.AbstractConnection
    public SyncSubscription subscribeSync(String str, String str2) {
        return (SyncSubscription) subscribe(str, str2, null, createMsgChannel());
    }

    @Override // io.nats.client.AbstractConnection
    public SyncSubscription subscribeSync(String str) {
        return (SyncSubscription) subscribe(str, null, null, createMsgChannel());
    }

    void writePublishProto(ByteBuffer byteBuffer, byte[] bArr, byte[] bArr2, int i) {
        this.pubProtoBuf.put(bArr, 0, bArr.length);
        if (bArr2 != null) {
            this.pubProtoBuf.put((byte) 32);
            this.pubProtoBuf.put(bArr2, 0, bArr2.length);
        }
        this.pubProtoBuf.put((byte) 32);
        byte[] bArr3 = new byte[12];
        int length = bArr3.length;
        if (i > 0) {
            int i2 = i;
            while (true) {
                int i3 = i2;
                if (i3 <= 0) {
                    break;
                }
                length--;
                bArr3[length] = digits[i3 % 10];
                i2 = i3 / 10;
            }
        } else {
            length--;
            bArr3[length] = digits[0];
        }
        this.pubProtoBuf.put(bArr3, length, bArr3.length - length);
        this.pubProtoBuf.put(crlfProtoBytes, 0, crlfProtoBytesLen);
    }

    void publish(byte[] bArr, byte[] bArr2, byte[] bArr3, boolean z) throws IOException {
        int length = bArr3 != null ? bArr3.length : 0;
        this.mu.lock();
        try {
            if (length > this.info.getMaxPayload()) {
                throw new IllegalArgumentException(Nats.ERR_MAX_PAYLOAD);
            }
            if (closed()) {
                throw new IllegalStateException(Nats.ERR_CONNECTION_CLOSED);
            }
            if (reconnecting()) {
                try {
                    this.bw.flush();
                } catch (IOException e) {
                    this.logger.error("I/O exception during flush");
                }
                if (this.pending.size() >= this.opts.getReconnectBufSize()) {
                    throw new IOException(Nats.ERR_RECONNECT_BUF_EXCEEDED);
                }
            }
            try {
                writePublishProto(this.pubProtoBuf, bArr, bArr2, length);
            } catch (BufferOverflowException e2) {
                this.logger.warn("nats: reallocating publish buffer due to overflow");
                buildPublishProtocolBuffer(1024 + bArr.length + (bArr2 != null ? bArr2.length : 0));
                writePublishProto(this.pubProtoBuf, bArr, bArr2, length);
            }
            try {
                this.bw.write(this.pubProtoBuf.array(), 0, this.pubProtoBuf.position());
                this.pubProtoBuf.position(pubPrimBytesLen);
                if (length > 0) {
                    this.bw.write(bArr3, 0, length);
                }
                this.bw.write(crlfProtoBytes, 0, crlfProtoBytesLen);
                this.stats.incrementOutMsgs();
                this.stats.incrementOutBytes(length);
                if (z) {
                    this.bw.flush();
                    this.stats.incrementFlushes();
                } else if (this.fch.isEmpty()) {
                    kickFlusher();
                }
                this.mu.unlock();
            } catch (IOException e3) {
                setLastError(e3);
                this.mu.unlock();
            }
        } catch (Throwable th) {
            this.mu.unlock();
            throw th;
        }
    }

    @Override // io.nats.client.Connection
    public void publish(String str, String str2, byte[] bArr, boolean z) throws IOException {
        if (str == null) {
            throw new NullPointerException(Nats.ERR_BAD_SUBJECT);
        }
        if (str.isEmpty()) {
            throw new IllegalArgumentException(Nats.ERR_BAD_SUBJECT);
        }
        byte[] bytes = str.getBytes();
        byte[] bArr2 = null;
        if (str2 != null) {
            bArr2 = str2.getBytes();
        }
        publish(bytes, bArr2, bArr, z);
    }

    @Override // io.nats.client.Connection
    public void publish(String str, String str2, byte[] bArr) throws IOException {
        publish(str, str2, bArr, false);
    }

    @Override // io.nats.client.Connection
    public void publish(String str, byte[] bArr) throws IOException {
        publish(str, null, bArr);
    }

    @Override // io.nats.client.Connection
    public void publish(Message message) throws IOException {
        publish(message.getSubjectBytes(), message.getReplyToBytes(), message.getData(), false);
    }

    @Override // io.nats.client.Connection
    public Message request(String str, byte[] bArr, long j, TimeUnit timeUnit) throws IOException, InterruptedException {
        String newInbox = newInbox();
        SyncSubscription syncSubscription = (SyncSubscription) subscribe(newInbox, null, null, createMsgChannel(8));
        Throwable th = null;
        try {
            try {
                syncSubscription.autoUnsubscribe(FLUSH_CHAN_SIZE);
                publish(str, newInbox, bArr);
                Message nextMessage = syncSubscription.nextMessage(j, timeUnit);
                if (syncSubscription != null) {
                    if (0 != 0) {
                        try {
                            syncSubscription.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        syncSubscription.close();
                    }
                }
                return nextMessage;
            } finally {
            }
        } catch (Throwable th3) {
            if (syncSubscription != null) {
                if (th != null) {
                    try {
                        syncSubscription.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    syncSubscription.close();
                }
            }
            throw th3;
        }
    }

    @Override // io.nats.client.Connection
    public Message request(String str, byte[] bArr, long j) throws IOException, InterruptedException {
        return request(str, bArr, j, TimeUnit.MILLISECONDS);
    }

    @Override // io.nats.client.Connection
    public Message request(String str, byte[] bArr) throws IOException, InterruptedException {
        return request(str, bArr, -1L, TimeUnit.MILLISECONDS);
    }

    @Override // io.nats.client.AbstractConnection
    public String newInbox() {
        return String.format("%s%s", INBOX_PREFIX, NUID.nextGlobal());
    }

    @Override // io.nats.client.AbstractConnection
    public synchronized Statistics getStats() {
        return new Statistics(this.stats);
    }

    @Override // io.nats.client.AbstractConnection
    public synchronized void resetStats() {
        this.stats.clear();
    }

    @Override // io.nats.client.AbstractConnection
    public synchronized long getMaxPayload() {
        return this.info.getMaxPayload();
    }

    void sendSubscriptionMessage(SubscriptionImpl subscriptionImpl) {
        String queue = subscriptionImpl.getQueue();
        Object[] objArr = new Object[3];
        objArr[0] = subscriptionImpl.getSubject();
        objArr[FLUSH_CHAN_SIZE] = (queue == null || queue.isEmpty()) ? _EMPTY_ : _SPC_ + queue;
        objArr[2] = Long.valueOf(subscriptionImpl.getSid());
        try {
            this.bw.write(String.format(SUB_PROTO, objArr).getBytes());
        } catch (IOException e) {
            this.logger.warn("nats: I/O exception while sending subscription message");
        }
    }

    @Override // io.nats.client.AbstractConnection
    public ClosedCallback getClosedCallback() {
        this.mu.lock();
        try {
            return this.opts.getClosedCallback();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public void setClosedCallback(ClosedCallback closedCallback) {
        this.mu.lock();
        try {
            this.opts.closedCb = closedCallback;
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public DisconnectedCallback getDisconnectedCallback() {
        this.mu.lock();
        try {
            return this.opts.getDisconnectedCallback();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public void setDisconnectedCallback(DisconnectedCallback disconnectedCallback) {
        this.mu.lock();
        try {
            this.opts.disconnectedCb = disconnectedCallback;
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public ReconnectedCallback getReconnectedCallback() {
        this.mu.lock();
        try {
            return this.opts.getReconnectedCallback();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public void setReconnectedCallback(ReconnectedCallback reconnectedCallback) {
        this.mu.lock();
        try {
            this.opts.reconnectedCb = reconnectedCallback;
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public ExceptionHandler getExceptionHandler() {
        this.mu.lock();
        try {
            return this.opts.getExceptionHandler();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
        this.mu.lock();
        try {
            this.opts.asyncErrorCb = exceptionHandler;
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public String getConnectedUrl() {
        this.mu.lock();
        try {
            if (this.status != Nats.ConnState.CONNECTED) {
                return null;
            }
            return getUrl().toString();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public String getConnectedServerId() {
        this.mu.lock();
        try {
            if (this.status != Nats.ConnState.CONNECTED) {
                return null;
            }
            return this.info.getId();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public Nats.ConnState getState() {
        this.mu.lock();
        try {
            return this.status;
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public ServerInfo getConnectedServerInfo() {
        return this.info;
    }

    void setConnectedServerInfo(ServerInfo serverInfo) {
        this.info = serverInfo;
    }

    @Override // io.nats.client.AbstractConnection
    public Exception getLastException() {
        return this.lastEx;
    }

    void setLastError(Exception exc) {
        this.lastEx = exc;
    }

    Options getOptions() {
        return this.opts;
    }

    void setOptions(Options options) {
        this.opts = options;
    }

    void setPending(ByteArrayOutputStream byteArrayOutputStream) {
        this.pending = byteArrayOutputStream;
    }

    ByteArrayOutputStream getPending() {
        return this.pending;
    }

    void setOutputStream(OutputStream outputStream) {
        this.mu.lock();
        try {
            this.bw = outputStream;
        } finally {
            this.mu.unlock();
        }
    }

    OutputStream getOutputStream() {
        return this.bw;
    }

    void setInputStream(InputStream inputStream) {
        this.mu.lock();
        try {
            this.br = inputStream;
        } finally {
            this.mu.unlock();
        }
    }

    InputStream getInputStream() {
        return this.br;
    }

    List<BlockingQueue<Boolean>> getPongs() {
        return this.pongs;
    }

    void setPongs(List<BlockingQueue<Boolean>> list) {
        this.pongs = list;
    }

    Map<Long, SubscriptionImpl> getSubs() {
        return this.subs;
    }

    void setSubs(Map<Long, SubscriptionImpl> map) {
        this.subs = map;
    }

    List<Srv> getServerPool() {
        return this.srvPool;
    }

    void setServerPool(List<Srv> list) {
        this.srvPool = list;
    }

    @Override // io.nats.client.AbstractConnection
    public int getPendingByteCount() {
        int i = 0;
        if (getPending() != null) {
            i = getPending().size();
        }
        return i;
    }

    protected void setFlushChannel(BlockingQueue<Boolean> blockingQueue) {
        this.fch = blockingQueue;
    }

    protected BlockingQueue<Boolean> getFlushChannel() {
        return this.fch;
    }

    void setTcpConnection(TcpConnection tcpConnection) {
        this.conn = tcpConnection;
    }

    TcpConnection getTcpConnection() {
        return this.conn;
    }

    void setTcpConnectionFactory(TcpConnectionFactory tcpConnectionFactory) {
        this.tcf = tcpConnectionFactory;
    }

    TcpConnectionFactory getTcpConnectionFactory() {
        return this.tcf;
    }

    URI getUrl() {
        return this.url;
    }

    void setUrl(URI uri) {
        this.url = uri;
    }

    int getActualPingsOutstanding() {
        return this.pout;
    }

    void setActualPingsOutstanding(int i) {
        this.pout = i;
    }

    ScheduledFuture<?> getPingTimer() {
        return this.ptmr;
    }

    void setPingTimer(ScheduledFuture<?> scheduledFuture) {
        this.ptmr = scheduledFuture;
    }

    void setParser(Parser parser) {
        this.parser = parser;
    }

    Parser getParser() {
        return this.parser;
    }

    String[] getServers(boolean z) {
        ArrayList arrayList = new ArrayList(this.srvPool.size());
        for (Srv srv : this.srvPool) {
            if (!z || srv.isImplicit()) {
                URI uri = srv.url;
                arrayList.add(String.format("%s://%s:%d", uri.getScheme(), uri.getHost(), Integer.valueOf(uri.getPort())));
            }
        }
        return (String[]) arrayList.toArray(new String[arrayList.size()]);
    }

    @Override // io.nats.client.AbstractConnection
    public String[] getServers() {
        this.mu.lock();
        try {
            return getServers(false);
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public String[] getDiscoveredServers() {
        this.mu.lock();
        try {
            return getServers(true);
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public boolean isAuthRequired() {
        this.mu.lock();
        try {
            return this.info.isAuthRequired();
        } finally {
            this.mu.unlock();
        }
    }

    @Override // io.nats.client.AbstractConnection
    public boolean isTlsRequired() {
        this.mu.lock();
        try {
            return this.info.isTlsRequired();
        } finally {
            this.mu.unlock();
        }
    }
}
