/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.raft;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.raft.DynamicMembership;
import org.jgroups.protocols.raft.InternalCommand;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.RaftLeaderException;
import org.jgroups.raft.Options;
import org.jgroups.raft.Settable;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Bits;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Util;

@MBean(description="Redirects requests to current leader")
public class REDIRECT
extends Protocol
implements Settable,
DynamicMembership {
    protected static final short REDIRECT_ID = 522;
    protected static final short REDIRECT_HDR = 4000;
    protected RAFT raft;
    protected volatile View view;
    protected final AtomicInteger request_ids = new AtomicInteger(1);
    protected final Map<Integer, CompletableFuture<byte[]>> requests = new HashMap<Integer, CompletableFuture<byte[]>>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, Options options) throws Exception {
        Address leader = this.leader("set()");
        if (Objects.equals(this.local_addr, leader)) {
            return this.raft.setAsync(buf, offset, length);
        }
        int req_id = this.request_ids.getAndIncrement();
        CompletableFuture<byte[]> future = new CompletableFuture<byte[]>();
        Map<Integer, CompletableFuture<byte[]>> map = this.requests;
        synchronized (map) {
            this.requests.put(req_id, future);
        }
        this.log.trace("%s: redirecting request %d to leader %s", this.local_addr, req_id, leader);
        Message redirect = new BytesMessage(leader, buf, offset, length).putHeader(this.id, new RedirectHeader(RequestType.REQ, req_id, false).options(options));
        this.down_prot.down(redirect);
        return future;
    }

    @Override
    public CompletableFuture<byte[]> addServer(String name) throws Exception {
        return this.changeServer(name, true);
    }

    @Override
    public CompletableFuture<byte[]> removeServer(String name) throws Exception {
        return this.changeServer(name, false);
    }

    @Override
    public void init() throws Exception {
        super.init();
        this.raft = RAFT.findProtocol(RAFT.class, this, true);
        if (this.raft == null) {
            throw new IllegalStateException("RAFT protocol not found");
        }
    }

    @Override
    public Object up(Event evt) {
        if (evt.getType() == 6) {
            this.view = (View)evt.getArg();
        }
        return this.up_prot.up(evt);
    }

    @Override
    public Object up(Message msg) {
        RedirectHeader hdr = (RedirectHeader)msg.getHeader(this.id);
        if (hdr != null) {
            this.handleEvent(msg, hdr);
            return null;
        }
        return this.up_prot.up(msg);
    }

    @Override
    public void up(MessageBatch batch) {
        Iterator<Message> it = batch.iterator();
        while (it.hasNext()) {
            Message msg = it.next();
            RedirectHeader hdr = (RedirectHeader)msg.getHeader(this.id);
            if (hdr == null) continue;
            it.remove();
            this.handleEvent(msg, hdr);
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleEvent(Message msg, RedirectHeader hdr) {
        Address sender = msg.src();
        Options opts = hdr.options();
        switch (hdr.type.ordinal()) {
            case 0: {
                this.log.trace("%s: received redirected request %d from %s", this.local_addr, hdr.corr_id, sender);
                ResponseHandler rsp_handler = new ResponseHandler(sender, hdr.corr_id, opts);
                try {
                    this.raft.setAsync(msg.getArray(), msg.getOffset(), msg.getLength(), opts).whenComplete((BiConsumer)rsp_handler);
                }
                catch (Throwable t) {
                    rsp_handler.apply(t);
                }
                break;
            }
            case 3: {
                CompletableFuture<byte[]> future = null;
                Map<Integer, CompletableFuture<byte[]>> map = this.requests;
                synchronized (map) {
                    future = this.requests.remove(hdr.corr_id);
                }
                if (future == null) break;
                this.log.trace("%s: received response for redirected request %d from %s", this.local_addr, hdr.corr_id, sender);
                if (!hdr.exception) {
                    if (opts.ignoreReturnValue()) {
                        future.complete(null);
                        break;
                    }
                    future.complete(msg.getArray());
                    break;
                }
                try {
                    Throwable t = (Throwable)Util.objectFromByteBuffer(msg.getArray());
                    future.completeExceptionally(t);
                }
                catch (Exception e) {
                    this.log.error("failed deserializing exception", e);
                }
                break;
            }
            case 1: 
            case 2: {
                ResponseHandler rsp_handler = new ResponseHandler(sender, hdr.corr_id, Options.create(true));
                InternalCommand.Type type = hdr.type == RequestType.ADD_SERVER ? InternalCommand.Type.addServer : InternalCommand.Type.removeServer;
                try {
                    this.raft.changeMembers(new String(msg.getArray(), msg.getOffset(), msg.getLength()), type).whenComplete((BiConsumer)rsp_handler);
                }
                catch (Throwable t) {
                    rsp_handler.apply(t);
                }
                break;
            }
            default: {
                this.log.error("type %d not known", new Object[]{hdr.type});
            }
        }
    }

    protected Address leader(String req_type) throws RaftLeaderException {
        Address leader = this.raft.leader();
        if (leader == null) {
            throw new RaftLeaderException(String.format("there is currently no leader to forward %s request to", req_type));
        }
        if (this.view != null && !this.view.containsMember(leader)) {
            throw new RaftLeaderException("leader " + String.valueOf(leader) + " is not member of view " + String.valueOf(this.view));
        }
        return leader;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected CompletableFuture<byte[]> changeServer(String name, boolean add) throws Exception {
        Address leader = this.leader("addServer()/removeServer()");
        if (Objects.equals(this.local_addr, leader)) {
            return this.raft.changeMembers(name, add ? InternalCommand.Type.addServer : InternalCommand.Type.removeServer);
        }
        int req_id = this.request_ids.getAndIncrement();
        CompletableFuture<byte[]> future = new CompletableFuture<byte[]>();
        Map<Integer, CompletableFuture<byte[]>> map = this.requests;
        synchronized (map) {
            this.requests.put(req_id, future);
        }
        this.log.trace("%s: redirecting request %d to leader %s", this.local_addr, req_id, leader);
        byte[] buffer = Util.stringToBytes(name);
        Message redirect = new BytesMessage(leader, buffer).putHeader(this.id, new RedirectHeader(add ? RequestType.ADD_SERVER : RequestType.REMOVE_SERVER, req_id, false));
        this.down_prot.down(redirect);
        return future;
    }

    static {
        ClassConfigurator.addProtocol((short)522, REDIRECT.class);
        ClassConfigurator.add((short)4000, RedirectHeader.class);
    }

    public static class RedirectHeader
    extends Header {
        protected RequestType type;
        protected int corr_id;
        protected boolean exception;
        protected Options options = new Options();

        public RedirectHeader() {
        }

        public RedirectHeader(RequestType type, int corr_id, boolean exception) {
            this.type = type;
            this.corr_id = corr_id;
            this.exception = exception;
        }

        @Override
        public short getMagicId() {
            return 4000;
        }

        @Override
        public Supplier<? extends Header> create() {
            return RedirectHeader::new;
        }

        @Override
        public int serializedSize() {
            return 2 + Bits.size(this.corr_id) + 1;
        }

        public RedirectHeader options(Options opts) {
            if (opts != null && !this.options.equals(opts)) {
                this.options = opts;
            }
            return this;
        }

        public Options options() {
            return this.options;
        }

        @Override
        public void writeTo(DataOutput out) throws IOException {
            out.writeByte((byte)this.type.ordinal());
            Bits.writeIntCompressed(this.corr_id, out);
            out.writeBoolean(this.exception);
            this.options.writeTo(out);
        }

        @Override
        public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
            this.type = RequestType.values()[in.readByte()];
            this.corr_id = Bits.readIntCompressed(in);
            this.exception = in.readBoolean();
            this.options.readFrom(in);
        }

        @Override
        public String toString() {
            return this.type.toString() + ", corr_id=" + this.corr_id + ", exception=" + this.exception;
        }
    }

    public static enum RequestType {
        REQ,
        ADD_SERVER,
        REMOVE_SERVER,
        RSP;

    }

    protected class ResponseHandler
    implements BiConsumer<byte[], Throwable> {
        protected final Address dest;
        protected final int corr_id;
        protected final Options options;

        public ResponseHandler(Address dest, int corr_id, Options opts) {
            this.dest = dest;
            this.corr_id = corr_id;
            this.options = opts;
        }

        @Override
        public void accept(byte[] buf, Throwable ex) {
            if (ex != null) {
                this.apply(ex);
            } else {
                this.apply(buf);
            }
        }

        protected void apply(byte[] arg) {
            if (arg != null && this.options != null && this.options.ignoreReturnValue()) {
                arg = null;
            }
            Message msg = new BytesMessage(this.dest, arg).putHeader(REDIRECT.this.id, new RedirectHeader(RequestType.RSP, this.corr_id, false).options(this.options));
            REDIRECT.this.down_prot.down(msg);
        }

        protected void apply(Throwable t) {
            try {
                byte[] buf = Util.objectToByteBuffer(t);
                Message msg = new BytesMessage(this.dest, buf).putHeader(REDIRECT.this.id, new RedirectHeader(RequestType.RSP, this.corr_id, true));
                REDIRECT.this.down_prot.down(msg);
            }
            catch (Exception ex) {
                REDIRECT.this.log.error("failed serializing exception", ex);
            }
        }
    }
}

