/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.raft;

import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.raft.DynamicMembership;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.raft.Settable;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;

@MBean(description="Listens on a socket for client requests, forwards them to the leader and send responses")
public class CLIENT
extends Protocol
implements Runnable {
    protected static final short CLIENT_ID = 523;
    protected static final byte[] BUF = new byte[0];
    @Property(name="bind_addr", description="The bind address which should be used by the server socket. The following special values are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL, NON_LOOPBACK, match-interface, match-host, match-address", systemProperty={"jgroups.bind_addr"}, writable=false)
    protected InetAddress bind_addr;
    @Property(description="Port to listen for client requests", writable=false)
    protected int port = 1965;
    @Property(description="The min threads in the thread pool")
    protected int min_threads;
    @Property(description="Max number of threads in the thread pool")
    protected int max_threads = 100;
    @Property(description="Number of ms a thread can be idle before being removed from the thread pool", type=AttributeType.TIME)
    protected long idle_time = 5000L;
    @Property(description="Number of bytes of the server socket's receive buffer", type=AttributeType.BYTES)
    protected int recv_buf_size;
    protected Settable settable;
    protected DynamicMembership dyn_membership;
    protected ServerSocket sock;
    protected ExecutorService thread_pool;
    protected Thread acceptor;

    public InetAddress getBindAddress() {
        return this.bind_addr;
    }

    public CLIENT setBindAddress(InetAddress b) {
        this.bind_addr = b;
        return this;
    }

    public int getPort() {
        return this.port;
    }

    public CLIENT setPort(int p) {
        this.port = p;
        return this;
    }

    public int getMinThreads() {
        return this.min_threads;
    }

    public CLIENT setMinThreads(int t) {
        this.min_threads = t;
        return this;
    }

    public int getMaxThreads() {
        return this.max_threads;
    }

    public CLIENT setMaxThreads(int t) {
        this.max_threads = t;
        return this;
    }

    public long getIdleTime() {
        return this.idle_time;
    }

    public CLIENT setIdleTime(long t) {
        this.idle_time = t;
        return this;
    }

    public int getReceiveBufferSize() {
        return this.recv_buf_size;
    }

    public CLIENT setReceiveBufferSize(int s) {
        this.recv_buf_size = s;
        return this;
    }

    @Override
    public void init() throws Exception {
        super.init();
        this.settable = RAFT.findProtocol(Settable.class, this, true);
        if (this.settable == null) {
            throw new IllegalStateException("did not find a protocol implementing Settable (e.g. REDIRECT or RAFT)");
        }
        this.dyn_membership = RAFT.findProtocol(DynamicMembership.class, this, true);
        if (this.dyn_membership == null) {
            throw new IllegalStateException("did not find a protocol implementing DynamicMembership (e.g. REDIRECT or RAFT)");
        }
    }

    @Override
    public void start() throws Exception {
        super.start();
        this.sock = Util.createServerSocket(this.getSocketFactory(), "CLIENR.srv_sock", this.bind_addr, this.port, this.port + 50, this.recv_buf_size);
        if (this.sock == null) {
            throw new IllegalStateException(String.format("failed creating server socket at %s:%d", this.bind_addr, this.port));
        }
        this.thread_pool = new ThreadPoolExecutor(this.min_threads, this.max_threads, this.idle_time, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), this.getThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
        this.acceptor = new Thread((Runnable)this, "CLIENT.Acceptor");
        this.acceptor.start();
    }

    @Override
    public void stop() {
        super.stop();
        Util.close((Closeable)this.sock);
        if (this.thread_pool != null) {
            this.thread_pool.shutdown();
        }
    }

    @Override
    public void destroy() {
        super.destroy();
        Util.close((Closeable)this.sock);
        if (this.thread_pool != null) {
            this.thread_pool.shutdown();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                while (true) {
                    Socket client_sock = this.sock.accept();
                    this.thread_pool.execute(new RequestHandler(client_sock));
                }
            }
            catch (IOException ignored) {
                if (!this.sock.isClosed()) continue;
                return;
            }
            catch (Throwable ex) {
                this.log.error("error accepting new connection", ex);
                continue;
            }
            break;
        }
    }

    @Override
    public void up(MessageBatch batch) {
        this.up_prot.up(batch);
    }

    static {
        ClassConfigurator.addProtocol((short)523, CLIENT.class);
    }

    protected class RequestHandler
    implements Runnable {
        protected final Socket client_sock;

        public RequestHandler(Socket client_sock) {
            this.client_sock = client_sock;
        }

        @Override
        public void run() {
            DataInputStream in = null;
            DataOutputStream out = null;
            CompletionHandler completion_handler = null;
            try {
                in = new DataInputStream(this.client_sock.getInputStream());
                out = new DataOutputStream(this.client_sock.getOutputStream());
                RequestType type = RequestType.values()[in.readByte()];
                int request_id = in.readInt();
                completion_handler = new CompletionHandler(this.client_sock, in, out, request_id);
                int length = in.readInt();
                byte[] buffer = new byte[length];
                in.readFully(buffer);
                switch (type.ordinal()) {
                    case 0: {
                        CLIENT.this.settable.setAsync(buffer, 0, buffer.length).whenComplete((BiConsumer)completion_handler);
                        break;
                    }
                    case 1: {
                        CLIENT.this.dyn_membership.addServer(Util.bytesToString(buffer)).whenComplete((BiConsumer)completion_handler);
                        break;
                    }
                    case 2: {
                        CLIENT.this.dyn_membership.removeServer(Util.bytesToString(buffer)).whenComplete((BiConsumer)completion_handler);
                        break;
                    }
                }
            }
            catch (Throwable ex) {
                CLIENT.this.log.error("failed handling request", ex);
                if (completion_handler != null) {
                    completion_handler.accept(null, ex);
                }
                Util.close(in, out, this.client_sock);
            }
        }

        protected void send(DataOutput out, RequestType type, int request_id, byte[] buffer, int offset, int length) throws Exception {
            out.writeByte((byte)type.ordinal());
            out.writeInt(request_id);
            int len = buffer == null ? 0 : length;
            out.writeInt(len);
            if (len > 0) {
                out.write(buffer, offset, length);
            }
        }

        protected class CompletionHandler
        implements BiConsumer<byte[], Throwable> {
            protected final Socket s;
            protected final DataInputStream input;
            protected final DataOutputStream output;
            protected final int req_id;

            public CompletionHandler(Socket client_sock, DataInputStream input, DataOutputStream output, int req) {
                this.s = client_sock;
                this.input = input;
                this.output = output;
                this.req_id = req;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void accept(byte[] buf, Throwable ex) {
                block6: {
                    if (ex == null) break block6;
                    byte[] rsp_buffer = Util.objectToByteBuffer(ex);
                    RequestHandler.this.send(this.output, RequestType.rsp, this.req_id, rsp_buffer, 0, rsp_buffer.length);
                    Util.close(this.output, this.input, this.s);
                    return;
                }
                try {
                    if (buf == null) {
                        buf = BUF;
                    }
                    RequestHandler.this.send(this.output, RequestType.rsp, this.req_id, buf, 0, buf.length);
                }
                catch (Throwable t) {
                    try {
                        CLIENT.this.log.error("failed in sending response to client", t);
                    }
                    catch (Throwable throwable) {
                        Util.close(this.output, this.input, this.s);
                        throw throwable;
                    }
                    Util.close(this.output, this.input, this.s);
                }
                Util.close(this.output, this.input, this.s);
            }
        }
    }

    public static enum RequestType {
        set_req,
        add_server,
        remove_server,
        type,
        rsp;

    }
}

