/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.raft;

import java.io.Closeable;
import java.io.File;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.function.ObjLongConsumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.jgroups.Address;
import org.jgroups.BytesMessage;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ObjectMessage;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.raft.AppendEntriesRequest;
import org.jgroups.protocols.raft.AppendEntriesResponse;
import org.jgroups.protocols.raft.AppendResult;
import org.jgroups.protocols.raft.DynamicMembership;
import org.jgroups.protocols.raft.Follower;
import org.jgroups.protocols.raft.InstallSnapshotRequest;
import org.jgroups.protocols.raft.InternalCommand;
import org.jgroups.protocols.raft.Leader;
import org.jgroups.protocols.raft.Log;
import org.jgroups.protocols.raft.LogEntries;
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.PersistentState;
import org.jgroups.protocols.raft.RaftHeader;
import org.jgroups.protocols.raft.RaftImpl;
import org.jgroups.protocols.raft.Role;
import org.jgroups.protocols.raft.state.RaftState;
import org.jgroups.raft.Options;
import org.jgroups.raft.Settable;
import org.jgroups.raft.StateMachine;
import org.jgroups.raft.util.CommitTable;
import org.jgroups.raft.util.LogCache;
import org.jgroups.raft.util.RequestTable;
import org.jgroups.stack.Protocol;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.ByteArrayDataInputStream;
import org.jgroups.util.ByteArrayDataOutputStream;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.ExtendedUUID;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Runner;
import org.jgroups.util.Util;

@MBean(description="Implementation of the RAFT consensus protocol")
public class RAFT
extends Protocol
implements Settable,
DynamicMembership {
    public static final byte[] raft_id_key = Util.stringToBytes("raft-id");
    protected static final short RAFT_ID = 521;
    protected static final short APPEND_ENTRIES_REQ = 2000;
    protected static final short APPEND_ENTRIES_RSP = 2001;
    protected static final short APPEND_RESULT = 2002;
    protected static final short INSTALL_SNAPSHOT_REQ = 2003;
    protected static final short LOG_ENTRIES = 2004;
    public static final Function<ExtendedUUID, String> print_function = uuid -> {
        byte[] val = uuid.get(raft_id_key);
        return val != null ? Util.bytesToString(val) : uuid.print();
    };
    @Property(description="The identifier of this node. Needs to be unique and an element of members. Must not be null", writable=false)
    protected String raft_id;
    protected final PersistentState internal_state = new PersistentState();
    protected final RaftState raft_state = new RaftState(this, this::leaderUpdated);
    @ManagedAttribute(description="Majority needed to achieve consensus; computed from members)")
    protected int majority = -1;
    @Property(description="If true, we can change 'members' at runtime")
    protected boolean dynamic_view_changes = true;
    @Property(description="The fully qualified name of the class implementing Log")
    protected String log_class = "org.jgroups.protocols.raft.LevelDBLog";
    @Property(description="Arguments to the log impl, e.g. k1=v1,k2=v2. These will be passed to init()")
    protected String log_args;
    @Property(description="The directory in which the log and snapshots are stored. Defaults to the temp dir")
    protected String log_dir = Util.checkForMac() ? File.separator + "tmp" : System.getProperty("java.io.tmpdir", File.separator + "tmp");
    @Property(description="The prefix of the log and snapshot. If null, the logical name of the channel is used as prefix")
    protected String log_prefix;
    @ManagedAttribute(description="The name of the log")
    protected String log_name;
    @Property(description="Interval (ms) at which AppendEntries messages are resent to members with missing log entries", type=AttributeType.TIME)
    protected long resend_interval = 1000L;
    @Property(description="Send commit message to followers immediately after leader commits (majority has consensus). Caution : it may generate more traffic than expected")
    protected boolean send_commits_immediately;
    @Property(description="Max number of bytes a log can have until a snapshot is created", type=AttributeType.BYTES)
    protected int max_log_size = 1000000;
    protected int _max_log_cache_size = 1024;
    protected boolean _log_use_fsync;
    @ManagedAttribute(description="The current size of the log in bytes", type=AttributeType.BYTES)
    protected long curr_log_size;
    @ManagedAttribute(description="Number of successful AppendEntriesRequests")
    protected int num_successful_append_requests;
    @ManagedAttribute(description="Number of snapshot messages received (by a follower)")
    protected int num_snapshot_received;
    @ManagedAttribute(description="Average AppendEntries batch size")
    protected AverageMinMax avg_append_entries_batch_size = new AverageMinMax();
    @ManagedAttribute(description="Number of failed AppendEntriesRequests because the entry wasn't found in the log")
    protected int num_failed_append_requests_not_found;
    @ManagedAttribute(description="Number of failed AppendEntriesRequests because the prev entry's term didn't match")
    protected int num_failed_append_requests_wrong_term;
    protected StateMachine state_machine;
    protected boolean state_machine_loaded;
    protected Log log_impl;
    protected RequestTable<String> request_table;
    protected CommitTable commit_table;
    protected final List<RoleChange> role_change_listeners = new ArrayList<RoleChange>();
    protected volatile RaftImpl impl = new Follower(this);
    protected volatile View view;
    @ManagedAttribute(description="Index of the highest log entry appended to the log", type=AttributeType.SCALAR)
    protected long last_appended;
    @ManagedAttribute(description="Index of the last committed log entry", type=AttributeType.SCALAR)
    protected long commit_index;
    @ManagedAttribute(description="The number of snapshots performed")
    protected int num_snapshots;
    @ManagedAttribute(description="The number of times AppendEntriesRequests were resent")
    protected int num_resends;
    @Property(description="Max size in items the processing queue can have", type=AttributeType.SCALAR)
    protected int processing_queue_max_size = 9182;
    protected BlockingQueue<Request> processing_queue;
    protected final List<Request> remove_queue = new ArrayList<Request>();
    protected Runner runner;
    protected boolean synchronous;
    protected CompletableFuture<byte[]> add_server_future = CompletableFuture.completedFuture(null);
    @ManagedAttribute
    final LongAdder drained_total = new LongAdder();
    @ManagedAttribute
    final AverageMinMax drained_avg = new AverageMinMax();
    @ManagedAttribute
    final LongAdder drained_down = new LongAdder();
    @ManagedAttribute
    final LongAdder drained_up = new LongAdder();

    @ManagedAttribute(description="Size of remove-queue")
    public int removeQueueSize() {
        return this.remove_queue.size();
    }

    @ManagedAttribute(description="Size of processing queue")
    public int processingQueueSize() {
        return this.processing_queue.size();
    }

    @ManagedAttribute
    public String drainRatio() {
        double down2 = (double)this.drained_down.sum() / (double)this.drained_total.sum();
        double up2 = (double)this.drained_up.sum() / (double)this.drained_total.sum();
        return String.format("down=%.2f up=%.2f", down2, up2);
    }

    public String raftId() {
        return this.raft_id;
    }

    public RAFT raftId(String id) {
        if (id != null) {
            this.raft_id = id;
        }
        return this;
    }

    public RaftImpl impl() {
        return this.impl;
    }

    public int majority() {
        return this.majority;
    }

    public String logClass() {
        return this.log_class;
    }

    public RAFT logClass(String clazz) {
        this.log_class = clazz;
        return this;
    }

    public String logArgs() {
        return this.log_args;
    }

    public RAFT logArgs(String args) {
        this.log_args = args;
        return this;
    }

    public String logPrefix() {
        return this.log_prefix;
    }

    public RAFT logPrefix(String name) {
        this.log_prefix = name;
        return this;
    }

    public String logName() {
        return this.log_name;
    }

    public long resendInterval() {
        return this.resend_interval;
    }

    public RAFT resendInterval(long val) {
        this.resend_interval = val;
        return this;
    }

    public boolean sendCommitsImmediately() {
        return this.send_commits_immediately;
    }

    public RAFT sendCommitsImmediately(boolean v) {
        this.send_commits_immediately = v;
        return this;
    }

    public int maxLogSize() {
        return this.max_log_size;
    }

    public RAFT maxLogSize(int val) {
        this.max_log_size = val;
        return this;
    }

    public long currentLogSize() {
        return this.curr_log_size;
    }

    @ManagedAttribute(description="Number of pending requests")
    public int requestTableSize() {
        return this.request_table != null ? this.request_table.size() : 0;
    }

    public int numSnapshots() {
        return this.num_snapshots;
    }

    @ManagedAttribute(description="The current leader (can be null if there is currently no leader) ")
    public Address leader() {
        return this.raft_state.leader();
    }

    public RAFT leader(Address new_leader) {
        this.raft_state.setLeader(new_leader);
        return this;
    }

    public boolean isLeader() {
        return Objects.equals(this.leader(), this.local_addr);
    }

    public RAFT stateMachine(StateMachine sm) {
        this.state_machine = sm;
        return this;
    }

    public StateMachine stateMachine() {
        return this.state_machine;
    }

    public CommitTable commitTable() {
        return this.commit_table;
    }

    @ManagedAttribute(description="The current term. Incremented on leader change, or when a higher term is seen")
    public long currentTerm() {
        return this.raft_state.currentTerm();
    }

    @ManagedAttribute(description="The member this member voted for in the current term")
    public Address votedFor() {
        return this.raft_state.votedFor();
    }

    public long lastAppended() {
        return this.last_appended;
    }

    public long commitIndex() {
        return this.commit_index;
    }

    public Log log() {
        return this.log_impl;
    }

    public RAFT log(Log new_log) {
        this.log_impl = new_log;
        return this;
    }

    public RAFT addRoleListener(RoleChange c) {
        this.role_change_listeners.add(c);
        return this;
    }

    public RAFT remRoleListener(RoleChange c) {
        this.role_change_listeners.remove(c);
        return this;
    }

    public RAFT stateMachineLoaded(boolean b) {
        this.state_machine_loaded = b;
        return this;
    }

    public boolean synchronous() {
        return this.synchronous;
    }

    public RAFT synchronous(boolean b) {
        this.synchronous = b;
        return this;
    }

    @Override
    public void resetStats() {
        super.resetStats();
        this.num_snapshot_received = 0;
        this.num_failed_append_requests_wrong_term = 0;
        this.num_failed_append_requests_not_found = 0;
        this.num_successful_append_requests = 0;
        this.num_resends = 0;
        this.num_snapshots = 0;
        if (this.log_impl instanceof LogCache) {
            ((LogCache)this.log_impl).resetStats();
        }
        this.drained_total.reset();
        this.drained_avg.clear();
        this.drained_down.reset();
        this.drained_up.reset();
        this.avg_append_entries_batch_size.clear();
    }

    @Property(description="Max size of the log cache (0 disables the log cache)", type=AttributeType.BYTES)
    public int maxLogCacheSize() {
        return this._max_log_cache_size;
    }

    @Property
    public RAFT maxLogCacheSize(int size) {
        this._max_log_cache_size = size;
        if (this.log_impl == null) {
            return this;
        }
        if (this.log_impl instanceof LogCache) {
            ((LogCache)this.log_impl).maxSize(size);
        } else if (size <= 0) {
            this.disableLogCache();
        } else {
            this.enableLogCache();
        }
        return this;
    }

    @Property(description="If true, a change is guaranteed to be written to disk when the call returns")
    public RAFT logUseFsync(boolean b) {
        this._log_use_fsync = b;
        if (this.log_impl != null) {
            this.log_impl.useFsync(b);
        }
        return this;
    }

    @Property
    public boolean logUseFsync() {
        return this.log_impl.useFsync();
    }

    @ManagedAttribute(description="Number of times the log cache has been trimmed", type=AttributeType.SCALAR)
    public int logCacheNumTrims() {
        return this.log_impl instanceof LogCache ? ((LogCache)this.log_impl).numTrims() : 0;
    }

    @ManagedAttribute(description="Number of times the cache has been accessed", type=AttributeType.SCALAR)
    public int LogCacheNumAccesses() {
        return this.log_impl instanceof LogCache ? ((LogCache)this.log_impl).numAccesses() : 0;
    }

    @ManagedAttribute(description="Hit ratio of the cache")
    public double logCacheHitRatio() {
        return this.log_impl instanceof LogCache ? ((LogCache)this.log_impl).hitRatio() : 0.0;
    }

    @Property(description="List of members (logical names); majority is computed from it")
    public void setMembers(String list) {
        this.members(Util.parseCommaDelimitedStrings(list));
    }

    public RAFT members(Collection<String> list) {
        this.internal_state.setMembers(list);
        this.computeMajority();
        return this;
    }

    @ManagedAttribute(description="The current list of members")
    public List<String> members() {
        return this.internal_state.getMembers();
    }

    public int currentTerm(long new_term) {
        return this.raft_state.tryAdvanceTerm(new_term);
    }

    public RAFT votedFor(Address mbr) {
        this.raft_state.setVotedFor(mbr);
        return this;
    }

    @ManagedAttribute(description="The current role")
    public String role() {
        return this.impl.getClass().getSimpleName();
    }

    @ManagedOperation(description="Dumps the commit table")
    public String dumpCommitTable() {
        return this.commit_table != null ? "\n" + String.valueOf(this.commit_table) : "n/a";
    }

    @ManagedAttribute(description="Number of log entries in the log")
    public long logSize() {
        return this.log_impl.size();
    }

    @ManagedAttribute(description="Describes the log")
    public String logDescription() {
        if (this.log_impl instanceof LogCache) {
            LogCache lc = (LogCache)this.log_impl;
            return String.format("%s (%d/%d) -> %s", lc.getClass().getSimpleName(), lc.cacheSize(), lc.maxSize(), lc.log().getClass().getSimpleName());
        }
        return this.log_impl.getClass().getSimpleName();
    }

    @ManagedOperation(description="Number of bytes in the log")
    public long logSizeInBytes() {
        return this.log_impl.sizeInBytes();
    }

    @ManagedOperation(description="Dumps the last N log entries")
    public String dumpLog(long last_n) {
        StringBuilder sb = new StringBuilder();
        long to = this.last_appended;
        long from = Math.max(1L, to - last_n);
        this.log_impl.forEach((entry, index) -> sb.append("index=").append(index).append(", term=").append(entry.term()).append(" (").append(entry.command().length).append(" bytes)\n"), from, to);
        return sb.toString();
    }

    @ManagedOperation(description="Dumps all log entries")
    public String dumpLog() {
        return this.dumpLog(this.last_appended - 1L);
    }

    @ManagedOperation(description="Enabled the log cache")
    public void enableLogCache() {
        if (!(this.log_impl instanceof LogCache)) {
            if (this._max_log_cache_size <= 0) {
                this.log.error("cannot enable log cache as max_log_cache_size is 0");
            } else {
                this.log_impl = new LogCache(this.log_impl, this._max_log_cache_size);
            }
        }
    }

    @ManagedOperation(description="Disables the log cache")
    public void disableLogCache() {
        if (this.log_impl instanceof LogCache) {
            LogCache lc = (LogCache)this.log_impl;
            this.log_impl = lc.log();
            lc.clear();
        }
    }

    @ManagedOperation(description="Clears the log cache")
    public RAFT clearLogCache() {
        if (this.log_impl instanceof LogCache) {
            ((LogCache)this.log_impl).clear();
        }
        return this;
    }

    @ManagedOperation(description="Trims the log cache to max_log_cache_size")
    public RAFT trimLogCache() {
        if (this.log_impl instanceof LogCache) {
            ((LogCache)this.log_impl).trim();
        }
        return this;
    }

    public void logEntries(ObjLongConsumer<LogEntry> func) {
        this.log_impl.forEach(func);
    }

    public long createNewTerm() {
        return this.raft_state.advanceTermForElection();
    }

    public static <T> T findProtocol(Class<T> clazz, Protocol start, boolean down2) {
        Protocol prot = start;
        while (prot != null && clazz != null) {
            if (clazz.isAssignableFrom(prot.getClass())) {
                return (T)prot;
            }
            prot = down2 ? prot.getDownProtocol() : prot.getUpProtocol();
        }
        return null;
    }

    @Override
    @ManagedOperation(description="Adds a new server to members. Prevents duplicates")
    public CompletableFuture<byte[]> addServer(String name) throws Exception {
        return this.changeMembers(name, InternalCommand.Type.addServer);
    }

    @Override
    @ManagedOperation(description="Removes a new server from members")
    public CompletableFuture<byte[]> removeServer(String name) throws Exception {
        return this.changeMembers(name, InternalCommand.Type.removeServer);
    }

    @ManagedOperation(description="Creates a new snapshot and truncates the log")
    public synchronized void snapshot() throws Exception {
        if (this.state_machine == null) {
            throw new IllegalStateException("state machine is null");
        }
        ByteArrayDataOutputStream out = new ByteArrayDataOutputStream(128, true);
        this.internal_state.writeTo(out);
        this.state_machine.writeContentTo(out);
        ByteBuffer buf = ByteBuffer.wrap(out.buffer(), 0, out.position());
        this.log_impl.setSnapshot(buf);
        this.log_impl.truncate(this.commitIndex());
        ++this.num_snapshots;
        this.curr_log_size = 0L;
    }

    @ManagedOperation(description="Reads the snapshot (if present) and loads log entries from [first .. commit_index] into the state machine")
    public void initStateMachineFromLog() throws Exception {
        if (this.state_machine == null || this.state_machine_loaded) {
            return;
        }
        int snapshot_offset = 0;
        ByteBuffer sn = this.log_impl.getSnapshot();
        if (sn != null) {
            ByteArrayDataInputStream in = new ByteArrayDataInputStream(sn);
            this.internal_state.readFrom(in);
            this.state_machine.readContentFrom(in);
            snapshot_offset = 1;
            this.log.debug("%s: initialized state machine from snapshot (%d bytes)", this.local_addr, sn.position());
        }
        long from = Math.max(1L, this.log_impl.firstAppended() + (long)snapshot_offset);
        long to = this.commit_index;
        long count = 0L;
        for (long i = from; i <= to; ++i) {
            LogEntry log_entry = this.log_impl.get(i);
            if (log_entry == null) {
                this.log.error("%s: log entry for index %d not found in log", this.local_addr, i);
                break;
            }
            if (log_entry.command == null) continue;
            if (log_entry.internal) {
                this.executeInternalCommand(null, log_entry.command, log_entry.offset, log_entry.length);
                continue;
            }
            this.state_machine.apply(log_entry.command, log_entry.offset, log_entry.length, true);
            ++count;
        }
        this.state_machine_loaded = true;
        if (count > 0L) {
            this.log.debug("%s: applied %d entries from the log (%d - %d) to the state machine", this.local_addr, count, from, to);
        }
    }

    @Override
    public void init() throws Exception {
        JChannel ch;
        super.init();
        HashSet<String> tmp = new HashSet<String>(this.internal_state.getMembers());
        if (tmp.size() != this.internal_state.getMembers().size()) {
            this.log.error("members (%s) contains duplicates; removing them and setting members to %s", this.internal_state.getMembers(), tmp);
            this.internal_state.setMembers(new ArrayList<String>(tmp));
        }
        this.computeMajority();
        if (this.raft_id == null) {
            this.raft_id = InetAddress.getLocalHost().getHostName();
        }
        JChannel jChannel = ch = this.stack != null ? this.stack.getChannel() : null;
        if (ch != null) {
            ch.addAddressGenerator(() -> {
                ExtendedUUID.setPrintFunction(print_function);
                return ExtendedUUID.randomUUID(ch.getName()).put(raft_id_key, Util.stringToBytes(this.raft_id));
            });
        }
        this.processing_queue = new ArrayBlockingQueue<Request>(this.processing_queue_max_size);
        this.runner = new Runner(new DefaultThreadFactory("runner", true, true), "runner", this::processQueue, null);
    }

    @Override
    public void start() throws Exception {
        super.start();
        if (this.log_impl == null) {
            if (this.log_class == null) {
                throw new IllegalStateException("log_class has to be defined");
            }
            Class<?> clazz = Util.loadClass(this.log_class, this.getClass());
            this.log_impl = (Log)clazz.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            Map<Object, Object> args = this.log_args != null && !this.log_args.isEmpty() ? RAFT.parseCommaDelimitedProps(this.log_args) : new HashMap();
            if (this.log_prefix == null) {
                this.log_prefix = this.raft_id;
            }
            this.log_name = this.createLogName(this.log_prefix, "log");
            this.log_impl.init(this.log_name, args);
        }
        if (!(this.local_addr instanceof ExtendedUUID)) {
            throw new IllegalStateException("local address must be an ExtendedUUID but is a " + this.local_addr.getClass().getSimpleName());
        }
        this.last_appended = this.log_impl.lastAppended();
        this.commit_index = this.log_impl.commitIndex();
        this.raft_state.reload();
        this.log.trace("%s: set last_appended=%d, commit_index=%d, current_state=%s", this.local_addr, this.last_appended, this.commit_index, this.raft_state);
        this.initStateMachineFromLog();
        if (!this.internal_state.getMembers().contains(this.raft_id)) {
            throw new IllegalStateException(String.format("raft-id %s is not listed in members %s", this.raft_id, this.internal_state.getMembers()));
        }
        this.curr_log_size = this.logSizeInBytes();
        this.log_impl.useFsync(this._log_use_fsync);
        if (this._max_log_cache_size > 0) {
            this.log_impl = new LogCache(this.log_impl, this._max_log_cache_size);
        }
        this.runner.start();
    }

    @Override
    public void stop() {
        super.stop();
        this.add_server_future.complete(null);
        this.runner.stop();
        this.impl.destroy();
        Util.close((Closeable)this.log_impl);
    }

    @Override
    public Object down(Event evt) {
        if (evt.getType() == 6) {
            this.handleView((View)evt.getArg());
        }
        return this.down_prot.down(evt);
    }

    @Override
    public Object up(Event evt) {
        if (evt.getType() == 6) {
            this.handleView((View)evt.getArg());
        }
        return this.up_prot.up(evt);
    }

    @Override
    public Object up(Message msg) {
        RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
        if (hdr != null) {
            if (this.synchronous) {
                this.handleUpRequest(msg, hdr);
            } else {
                this.add(new UpRequest(msg, hdr));
            }
            return null;
        }
        return this.up_prot.up(msg);
    }

    @Override
    public void up(MessageBatch batch) {
        Iterator<Message> it = batch.iterator();
        while (it.hasNext()) {
            Message msg = it.next();
            RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
            if (hdr == null) continue;
            it.remove();
            if (this.synchronous) {
                this.handleUpRequest(msg, hdr);
                continue;
            }
            this.add(new UpRequest(msg, hdr));
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    @ManagedOperation(description="Sends all pending AppendEntriesRequests")
    public void flushCommitTable() {
        if (this.commit_table != null) {
            this.commit_table.forEach(this::sendAppendEntriesMessage);
        }
    }

    public void flushCommitTable(Address member) {
        if (!this.isLeader()) {
            throw new IllegalStateException("Currently not the leader, should be " + String.valueOf(this.leader()));
        }
        CommitTable.Entry e = this.commit_table.get(Objects.requireNonNull(member));
        if (e != null) {
            this.sendAppendEntriesMessage(member, e);
        }
    }

    @Override
    public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, Options options) {
        return this.setAsync(buf, offset, length, false, options);
    }

    public CompletableFuture<byte[]> setAsync(byte[] buf, int offset, int length, boolean internal, Options options) {
        Address leader = this.leader();
        if (leader == null || this.local_addr != null && !leader.equals(this.local_addr)) {
            throw this.notCurrentLeader();
        }
        if (buf == null) {
            throw new IllegalArgumentException("buffer must not be null");
        }
        CompletableFuture<byte[]> retval = new CompletableFuture<byte[]>();
        RequestTable<String> reqtab = this.request_table;
        if (reqtab == null) {
            retval.completeExceptionally(new IllegalStateException("request table was null on " + this.impl.getClass().getSimpleName()));
            return retval;
        }
        if (this.synchronous) {
            this.handleDownRequest(retval, buf, offset, length, internal, options);
        } else {
            this.add(new DownRequest(retval, buf, offset, length, internal, options));
        }
        return retval;
    }

    @Override
    public String toString() {
        return String.format("%s %s: commit=%d last-appended=%d curr-state=%s", RAFT.class.getSimpleName(), this.local_addr, this.commit_index, this.last_appended, this.raft_state);
    }

    protected void add(Request r) {
        try {
            this.processing_queue.put(r);
        }
        catch (InterruptedException ex) {
            this.log.error("%s: failed adding %s to processing queue: %s", this.local_addr, r, ex);
        }
    }

    protected void handleDownRequest(CompletableFuture<byte[]> f, byte[] buf, int offset, int length, boolean internal, Options opts) {
        Address leader = this.leader();
        if (leader == null || !Objects.equals(leader, this.local_addr)) {
            throw this.notCurrentLeader();
        }
        RequestTable<String> reqtab = this.request_table;
        long prev_index = this.last_appended++;
        long curr_index = this.last_appended;
        long current_term = this.currentTerm();
        LogEntry entry = this.log_impl.get(prev_index);
        long prev_term = entry != null ? entry.term : 0L;
        LogEntries entries = new LogEntries().add(new LogEntry(current_term, buf, offset, length, internal));
        this.last_appended = this.log_impl.append(curr_index, entries);
        this.num_successful_append_requests += entries.size();
        reqtab.create(curr_index, this.raft_id, f, this::majority, opts);
        Message msg = new ObjectMessage(null, entries).putHeader(this.id, new AppendEntriesRequest(this.local_addr, current_term, prev_index, prev_term, current_term, this.commit_index)).setFlag(Message.TransientFlag.DONT_LOOPBACK);
        this.down_prot.down(msg);
        this.snapshotIfNeeded(length);
        long highest_committed = prev_index + 1L;
        while (reqtab.isCommitted(highest_committed)) {
            ++highest_committed;
        }
        if (highest_committed > prev_index + 1L) {
            this.commitLogTo(highest_committed, true);
        }
    }

    public void handleUpRequest(Message msg, RaftHeader hdr) {
        int rc = this.currentTerm(hdr.curr_term);
        if (rc < 0) {
            return;
        }
        RaftImpl ri = this.impl;
        if (ri == null) {
            return;
        }
        if (hdr instanceof AppendEntriesRequest) {
            long current_term = this.currentTerm();
            AppendEntriesRequest r = (AppendEntriesRequest)hdr;
            ObjectMessage om = (ObjectMessage)msg;
            this.log.trace("%s: from %s, %s header %s", this.local_addr, msg.src(), om, r);
            AppendResult res = ri.handleAppendEntriesRequest((LogEntries)om.getObject(), msg.src(), r.prev_log_index, r.prev_log_term, r.entry_term, r.leader_commit);
            res.commitIndex(this.commit_index);
            Message rsp = new EmptyMessage(msg.src()).putHeader(this.id, new AppendEntriesResponse(current_term, res));
            this.down_prot.down(rsp);
        } else if (hdr instanceof AppendEntriesResponse) {
            AppendEntriesResponse rsp = (AppendEntriesResponse)hdr;
            this.log.trace("%s: from %s res %s", this.local_addr, msg.src(), rsp);
            ri.handleAppendEntriesResponse(msg.src(), rsp.curr_term, rsp.result);
        } else if (hdr instanceof InstallSnapshotRequest) {
            InstallSnapshotRequest req = (InstallSnapshotRequest)hdr;
            ri.handleInstallSnapshotRequest(msg, req.leader, req.last_included_index, req.last_included_term);
        } else {
            this.log.warn("%s: invalid header %s", this.local_addr, hdr.getClass().getCanonicalName());
        }
    }

    protected void processQueue() {
        try {
            Request first_req = this.processing_queue.poll(this.resend_interval, TimeUnit.MILLISECONDS);
            if (first_req == null) {
                if (this.commit_table != null) {
                    this.commit_table.forEach(this::sendAppendEntriesMessage);
                }
                return;
            }
            while (true) {
                this.remove_queue.clear();
                if (first_req != null) {
                    this.remove_queue.add(first_req);
                    first_req = null;
                }
                this.processing_queue.drainTo(this.remove_queue);
                int num = this.remove_queue.size();
                if (num > 0) {
                    this.drained_total.add(num);
                    this.drained_avg.add(num);
                    AtomicInteger down_r = new AtomicInteger();
                    AtomicInteger up_r = new AtomicInteger();
                    this.remove_queue.forEach(r -> {
                        if (r instanceof DownRequest) {
                            down_r.incrementAndGet();
                        } else if (r instanceof UpRequest) {
                            up_r.incrementAndGet();
                        }
                    });
                    this.drained_down.add(down_r.get());
                    this.drained_up.add(up_r.get());
                }
                if (this.remove_queue.isEmpty()) {
                    return;
                }
                this.process(this.remove_queue);
            }
        }
        catch (InterruptedException interruptedException) {
            return;
        }
    }

    protected void process(List<Request> q) {
        RequestTable<String> reqtab = this.request_table;
        LogEntries entries = new LogEntries();
        long index = this.last_appended + 1L;
        int length = 0;
        long current_term = this.currentTerm();
        Address leader = this.leader();
        for (Request r : q) {
            try {
                if (r instanceof UpRequest) {
                    UpRequest up2 = (UpRequest)r;
                    this.handleUpRequest(up2.msg, up2.hdr);
                    continue;
                }
                if (!(r instanceof DownRequest)) continue;
                DownRequest dr = (DownRequest)r;
                if (!this.isLeader()) {
                    dr.f.completeExceptionally(this.notCurrentLeader());
                    continue;
                }
                entries.add(new LogEntry(current_term, dr.buf, dr.offset, dr.length, dr.internal));
                reqtab.create(index++, this.raft_id, dr.f, this::majority, dr.options);
                length += dr.length;
            }
            catch (Throwable ex) {
                this.log.error("%s: failed handling request %s: %s", this.local_addr, r, ex);
            }
        }
        if (entries.size() == 0) {
            return;
        }
        if (leader == null || !Objects.equals(leader, this.local_addr)) {
            throw this.notCurrentLeader();
        }
        long prev_index = this.last_appended;
        long curr_index = this.last_appended + 1L;
        LogEntry entry = this.log_impl.get(prev_index);
        long prev_term = entry != null ? entry.term : 0L;
        Message msg = new ObjectMessage(null, entries).putHeader(this.id, new AppendEntriesRequest(this.local_addr, current_term, prev_index, prev_term, current_term, this.commit_index)).setFlag(Message.TransientFlag.DONT_LOOPBACK);
        this.down_prot.down(msg);
        this.last_appended = this.log_impl.append(curr_index, entries);
        int batch_size = entries.size();
        this.num_successful_append_requests += batch_size;
        this.avg_append_entries_batch_size.add(batch_size);
        long highest_committed = prev_index + 1L;
        while (reqtab.isCommitted(highest_committed)) {
            ++highest_committed;
        }
        if (highest_committed > prev_index + 1L) {
            this.commitLogTo(highest_committed, true);
        }
        this.snapshotIfNeeded(length);
    }

    IllegalStateException notCurrentLeader() {
        return new IllegalStateException("I'm not the leader (local_addr=" + String.valueOf(this.local_addr) + ", leader=" + String.valueOf(this.leader()) + ")");
    }

    protected void createRequestTable() {
        this.request_table = new RequestTable();
        for (long i = this.commit_index + 1L; i <= this.last_appended; ++i) {
            this.request_table.create(i, this.raft_id, null, this::majority);
        }
    }

    protected void createCommitTable() {
        List<Object> jg_mbrs = this.view != null ? this.view.getMembers() : new ArrayList();
        ArrayList<Address> mbrs = new ArrayList<Address>(jg_mbrs);
        mbrs.remove(this.local_addr);
        this.commit_table = new CommitTable(mbrs, this.last_appended + 1L);
    }

    protected void _addServer(String name) {
        if (name == null) {
            return;
        }
        List<String> current = this.internal_state.getMembers();
        if (!current.contains(name)) {
            current.add(name);
            this.internal_state.setMembers(current);
            this.computeMajority();
        }
    }

    protected void _removeServer(String name) {
        if (name == null) {
            return;
        }
        List<String> current = this.internal_state.getMembers();
        if (current.remove(name)) {
            this.internal_state.setMembers(current);
            this.computeMajority();
        }
    }

    protected void sendAppendEntriesMessage(Address member, CommitTable.Entry e) {
        if (e.nextIndex() < this.log().firstAppended()) {
            try {
                this.sendSnapshotTo(member);
            }
            catch (Exception ex) {
                this.log.error("%s: failed sending snapshot to %s: next_index=%d, first_applied=%d", this.local_addr, member, e.nextIndex(), this.log().firstAppended());
            }
            return;
        }
        if (this.last_appended >= e.nextIndex()) {
            long to = e.sendSingleMessage() ? e.nextIndex() : this.last_appended;
            long from = Math.max(e.nextIndex(), 1L);
            if (this.log.isTraceEnabled()) {
                this.log.trace("%s: resending [%d..%d] to %s", this.local_addr, from, to, member);
            }
            this.resend(member, from, to);
            return;
        }
        if (this.last_appended > e.matchIndex()) {
            long index = this.last_appended;
            if (index > 0L) {
                this.log.trace("%s: resending %d to %s", this.local_addr, index, member);
                this.resend(member, index);
            }
            return;
        }
        if (this.commit_index > e.commitIndex()) {
            long current_term = this.currentTerm();
            Message msg = new ObjectMessage(member, null).putHeader(this.id, new AppendEntriesRequest(this.local_addr, current_term, 0L, 0L, current_term, this.commit_index));
            this.down_prot.down(msg);
            return;
        }
        if (this.commit_index < this.last_appended) {
            this.resend(member, this.commit_index + 1L, this.last_appended);
        }
    }

    protected CompletableFuture<byte[]> changeMembers(String name, InternalCommand.Type type) throws Exception {
        if (!this.dynamic_view_changes) {
            throw new Exception("dynamic view changes are not allowed; set dynamic_view_changes to true to enable it");
        }
        Address leader = this.leader();
        if (leader == null || !Objects.equals(leader, this.local_addr)) {
            throw this.notCurrentLeader();
        }
        InternalCommand cmd = new InternalCommand(type, name);
        byte[] buf = Util.streamableToByteBuffer(cmd);
        this.add_server_future = ((CompletableFuture)this.add_server_future.handle((ignore, t) -> this.setAsync(buf, 0, buf.length, true, null))).thenCompose(Function.identity());
        return this.add_server_future;
    }

    protected void resend(Address target, long index) {
        LogEntry entry = this.log_impl.get(index);
        if (entry == null) {
            this.log.error("%s: resending of %d failed; entry not found", this.local_addr, index);
            return;
        }
        LogEntry prev = this.log_impl.get(index - 1L);
        long prev_term = prev != null ? prev.term : 0L;
        LogEntries entries = new LogEntries().add(entry);
        Message msg = new ObjectMessage(target, entries).putHeader(this.id, new AppendEntriesRequest(this.local_addr, this.currentTerm(), index - 1L, prev_term, entry.term, this.commit_index));
        this.down_prot.down(msg);
        ++this.num_resends;
    }

    protected void resend(Address target, long from, long to) {
        LogEntry prev;
        LogEntries entries = new LogEntries();
        long entry_term = 0L;
        for (long i = from; i <= to; ++i) {
            LogEntry e = this.log_impl.get(i);
            if (e == null) {
                this.log.error("%s: resending of %d failed; entry not found", this.local_addr, i);
                break;
            }
            if (entry_term <= 0L) {
                entry_term = e.term();
            }
            entries.add(e);
        }
        long prev_term = (prev = this.log_impl.get(from - 1L)) != null ? prev.term : 0L;
        Message msg = new ObjectMessage(target, entries).putHeader(this.id, new AppendEntriesRequest(this.local_addr, this.currentTerm(), from - 1L, prev_term, entry_term, this.commit_index));
        this.down_prot.down(msg);
        ++this.num_resends;
    }

    protected void sendSnapshotTo(Address dest) throws Exception {
        LogEntry last_committed_entry = this.log_impl.get(this.commitIndex());
        long last_index = this.commit_index;
        long last_term = last_committed_entry.term;
        this.snapshot();
        ByteBuffer data = this.log_impl.getSnapshot();
        this.log.debug("%s: sending snapshot (%s) to %s", this.local_addr, Util.printBytes(data.position()), dest);
        Message msg = new BytesMessage(dest, data).putHeader(this.id, new InstallSnapshotRequest(this.currentTerm(), this.leader(), last_index, last_term));
        this.down_prot.down(msg);
    }

    protected RAFT commitLogTo(long index_inclusive, boolean serialize_response) {
        long to = Math.min(this.last_appended, index_inclusive);
        long last_successful_apply = this.applyCommits(to, serialize_response);
        this.commit_index = Math.max(this.commit_index, last_successful_apply);
        this.log_impl.commitIndex(this.commit_index);
        return this;
    }

    protected boolean append(long index, LogEntries entries) {
        if (index <= this.last_appended) {
            return false;
        }
        this.last_appended = this.log_impl.append(index, entries);
        this.snapshotIfNeeded((int)entries.totalSize());
        return true;
    }

    protected void deleteAllLogEntriesStartingFrom(long index) {
        this.log_impl.deleteAllEntriesStartingFrom(index);
        this.last_appended = this.log_impl.lastAppended();
        this.commit_index = this.log_impl.commitIndex();
    }

    protected void snapshotIfNeeded(int bytes_added) {
        this.curr_log_size += (long)bytes_added;
        if (this.curr_log_size >= (long)this.max_log_size) {
            try {
                this.log.debug("%s: current log size is %d, exceeding max_log_size of %d: creating snapshot", this.local_addr, this.curr_log_size, this.max_log_size);
                this.snapshot();
            }
            catch (Exception ex) {
                this.log.error("%s: failed snapshotting log: %s", this.local_addr, ex);
            }
        }
    }

    protected long applyCommits(long to_inclusive, boolean serialize_response) {
        long last_successful_apply = this.commit_index;
        long i = this.commit_index + 1L;
        while (i <= to_inclusive) {
            try {
                this.applyCommit(i, serialize_response);
                last_successful_apply = i++;
            }
            catch (Throwable t) {
                this.log.error("%s: failed moving commit_index to %d: %s", this.local_addr, to_inclusive, t);
                return last_successful_apply;
            }
        }
        return last_successful_apply;
    }

    protected void applyCommit(long index, boolean serialize_response) throws Exception {
        RequestTable.Entry<String> entry;
        LogEntry log_entry = this.log_impl.get(index);
        if (log_entry == null) {
            throw new IllegalStateException(String.valueOf(this.local_addr) + ": log entry for index " + index + " not found in log");
        }
        byte[] rsp = null;
        RequestTable.Entry<String> entry2 = entry = this.request_table != null ? this.request_table.remove(index) : null;
        if (log_entry.internal) {
            try {
                InternalCommand cmd = (InternalCommand)Util.streamableFromByteBuffer(InternalCommand.class, log_entry.command, log_entry.offset, log_entry.length);
                cmd.execute(this);
            }
            catch (Throwable t) {
                RAFT.notify(entry, t);
            }
        } else {
            Options opts;
            Options options = opts = entry != null ? entry.options() : null;
            if (opts != null && opts.ignoreReturnValue()) {
                serialize_response = false;
            }
            try {
                rsp = this.state_machine.apply(log_entry.command, log_entry.offset, log_entry.length, serialize_response);
            }
            catch (Throwable t) {
                RAFT.notify(entry, t);
            }
        }
        RAFT.notify(entry, rsp);
    }

    public void handleView(View view) {
        boolean check_view = this.view != null && this.view.size() < view.size();
        this.view = view;
        if (this.commit_table != null) {
            ArrayList<Address> mbrs = new ArrayList<Address>(view.getMembers());
            mbrs.remove(this.local_addr);
            this.commit_table.adjust(mbrs, this.last_appended + 1L);
        }
        if (check_view && this.duplicatesInView(view)) {
            this.log.error("view contains duplicate raft-ids: %s", view);
        }
    }

    public RAFT setLeaderAndTerm(Address new_leader) {
        return this.setLeaderAndTerm(new_leader, 0L);
    }

    public RAFT setLeaderAndTerm(Address new_leader, long new_term) {
        this.raft_state.tryAdvanceTermAndLeader(new_term, new_leader);
        return this;
    }

    private void leaderUpdated(Address new_leader) {
        if (Objects.equals(this.local_addr, new_leader)) {
            if (!this.isLeader()) {
                this.log.debug("%s: becoming Leader", this.local_addr);
            }
            this.changeRole(Role.Leader);
        } else {
            this.changeRole(Role.Follower);
        }
    }

    protected static <T> void notify(RequestTable.Entry<T> e, byte[] rsp) {
        if (e != null) {
            e.notify(rsp);
        }
    }

    protected static <T> void notify(RequestTable.Entry<T> e, Throwable t) {
        if (e != null) {
            e.notify(t);
        }
    }

    protected RAFT changeRole(Role new_role) {
        RaftImpl new_impl = new_role == Role.Leader ? new Leader(this) : new Follower(this);
        RaftImpl old_impl = this.impl;
        if (old_impl == null || !old_impl.getClass().equals(new_impl.getClass())) {
            if (old_impl != null) {
                old_impl.destroy();
            }
            new_impl.init();
            this.impl = new_impl;
            this.log.trace("%s: changed role from %s -> %s", this.local_addr, old_impl == null ? "null" : old_impl.getClass().getSimpleName(), new_impl.getClass().getSimpleName());
            this.notifyRoleChangeListeners(new_role);
        }
        return this;
    }

    protected void executeInternalCommand(InternalCommand cmd, byte[] buf, int offset, int length) {
        if (cmd == null) {
            try {
                cmd = (InternalCommand)Util.streamableFromByteBuffer(InternalCommand.class, buf, offset, length);
            }
            catch (Exception ex) {
                this.log.error("%s: failed unmarshalling internal command: %s", this.local_addr, ex);
                return;
            }
        }
        try {
            cmd.execute(this);
        }
        catch (Exception ex) {
            this.log.error("%s: failed executing internal command %s: %s", this.local_addr, cmd, ex);
        }
    }

    protected String createLogName(String name, String suffix) {
        if (!((String)suffix).startsWith(".")) {
            suffix = "." + (String)suffix;
        }
        boolean needs_suffix = !name.endsWith((String)suffix);
        Object retval = name;
        if (!new File(name).isAbsolute()) {
            retval = this.log_dir + File.separator + name;
        }
        return needs_suffix ? (String)retval + (String)suffix : retval;
    }

    protected void notifyRoleChangeListeners(Role role) {
        for (RoleChange ch : this.role_change_listeners) {
            try {
                ch.roleChanged(role);
            }
            catch (Throwable throwable) {}
        }
    }

    protected boolean duplicatesInView(View view) {
        HashSet<String> mbrs = new HashSet<String>();
        for (Address addr : view) {
            String m;
            if (!(addr instanceof ExtendedUUID)) {
                this.log.warn("address %s is not an ExtendedUUID but a %s", addr, addr.getClass().getSimpleName());
                continue;
            }
            ExtendedUUID uuid = (ExtendedUUID)addr;
            byte[] val = uuid.get(raft_id_key);
            String string = m = val != null ? Util.bytesToString(val) : null;
            if (m == null) {
                this.log.error("address %s doesn't have a raft-id", addr);
                continue;
            }
            if (mbrs.add(m)) continue;
            return true;
        }
        return false;
    }

    protected static Map<String, String> parseCommaDelimitedProps(String s) {
        if (s == null) {
            return null;
        }
        HashMap<String, String> props = new HashMap<String, String>();
        Pattern p = Pattern.compile("\\s*([^=\\s]+)\\s*=\\s*([^=\\s,]+)\\s*,?");
        Matcher matcher = p.matcher(s);
        while (matcher.find()) {
            props.put(matcher.group(1), matcher.group(2));
        }
        return props;
    }

    protected void computeMajority() {
        this.majority = this.internal_state.getMembers().size() / 2 + 1;
    }

    static {
        ClassConfigurator.addProtocol((short)521, RAFT.class);
        ClassConfigurator.add((short)2000, AppendEntriesRequest.class);
        ClassConfigurator.add((short)2001, AppendEntriesResponse.class);
        ClassConfigurator.add((short)2002, AppendResult.class);
        ClassConfigurator.add((short)2003, InstallSnapshotRequest.class);
        ClassConfigurator.add((short)2004, LogEntries.class);
    }

    protected static class UpRequest
    extends Request {
        private final Message msg;
        private final RaftHeader hdr;

        public UpRequest(Message msg, RaftHeader hdr) {
            this.msg = msg;
            this.hdr = hdr;
        }

        public String toString() {
            return String.format("%s %s", UpRequest.class.getSimpleName(), this.hdr);
        }
    }

    protected static class Request {
        protected Request() {
        }
    }

    protected static class DownRequest
    extends Request {
        final CompletableFuture<byte[]> f;
        final byte[] buf;
        final int offset;
        final int length;
        final boolean internal;
        final Options options;

        public DownRequest(CompletableFuture<byte[]> f, byte[] buf, int offset, int length, boolean internal, Options opts) {
            this.f = f;
            this.buf = buf;
            this.offset = offset;
            this.length = length;
            this.internal = internal;
            this.options = opts;
        }

        public String toString() {
            return String.format("%s %d bytes", DownRequest.class.getSimpleName(), this.length);
        }
    }

    public static interface RoleChange {
        public void roleChanged(Role var1);
    }
}

