/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.raft;

import java.util.Objects;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.raft.HeartbeatRequest;
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.RaftHeader;
import org.jgroups.protocols.raft.Role;
import org.jgroups.protocols.raft.VoteRequest;
import org.jgroups.protocols.raft.VoteResponse;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.TimeScheduler;

@MBean(description="Protocol performing leader election according to the RAFT paper")
public class ELECTION
extends Protocol {
    protected static final short ELECTION_ID = 520;
    protected static final short VOTE_REQ = 3000;
    protected static final short VOTE_RSP = 3001;
    protected static final short HEARTBEAT_REQ = 3002;
    @Property(description="Interval (in ms) at which a leader sends out heartbeats")
    protected long heartbeat_interval = 30L;
    @Property(description="Min election interval (ms)")
    protected long election_min_interval = 150L;
    @Property(description="Max election interval (ms). The actual election interval is computed as a random value in range [election_min_interval..election_max_interval]")
    protected long election_max_interval = 300L;
    protected Address voted_for;
    @ManagedAttribute(description="Number of votes this candidate received in the current term")
    protected int current_votes;
    @ManagedAttribute(description="No election will ever be started if true; this node will always be a follower. Used only for testing and may get removed. Don't use !")
    protected boolean no_elections;
    protected volatile boolean heartbeat_received = true;
    protected RAFT raft;
    protected Address local_addr;
    protected TimeScheduler timer;
    protected Future<?> election_task;
    protected Future<?> heartbeat_task;
    protected Role role = Role.Follower;

    public long heartbeatInterval() {
        return this.heartbeat_interval;
    }

    public ELECTION heartbeatInterval(long val) {
        this.heartbeat_interval = val;
        return this;
    }

    public long electionMinInterval() {
        return this.election_min_interval;
    }

    public ELECTION electionMinInterval(long val) {
        this.election_min_interval = val;
        return this;
    }

    public long electionMaxInterval() {
        return this.election_max_interval;
    }

    public ELECTION electionMaxInterval(long val) {
        this.election_max_interval = val;
        return this;
    }

    public boolean noElections() {
        return this.no_elections;
    }

    public ELECTION noElections(boolean flag) {
        this.no_elections = flag;
        return this;
    }

    @ManagedAttribute(description="The current role")
    public String role() {
        return this.role.toString();
    }

    @ManagedAttribute(description="Is the heartbeat task running")
    public synchronized boolean isHeartbeatTaskRunning() {
        return this.heartbeat_task != null && !this.heartbeat_task.isDone();
    }

    @ManagedAttribute(description="Is the election ttimer running")
    public synchronized boolean isElectionTimerRunning() {
        return this.election_task != null && !this.election_task.isDone();
    }

    public void init() throws Exception {
        super.init();
        if (this.heartbeat_interval < 1L) {
            throw new Exception(String.format("heartbeat_interval (%d) must not be below one", this.heartbeat_interval));
        }
        if (this.heartbeat_interval >= this.election_min_interval) {
            throw new Exception(String.format("heartbeat_interval (%d) needs to be smaller than election_min_interval (%d)", this.heartbeat_interval, this.election_min_interval));
        }
        if (this.election_min_interval >= this.election_max_interval) {
            throw new Exception(String.format("election_min_interval (%d) needs to be smaller than election_max_interval (%d)", this.election_min_interval, this.election_max_interval));
        }
        this.timer = this.getTransport().getTimer();
        this.raft = this.findProtocol(RAFT.class);
    }

    public Object down(Event evt) {
        switch (evt.getType()) {
            case 2: 
            case 80: 
            case 92: 
            case 93: {
                Object retval = this.down_prot.down(evt);
                this.startElectionTimer();
                return retval;
            }
            case 4: {
                this.changeRole(Role.Follower);
                this.stopElectionTimer();
                break;
            }
            case 8: {
                this.local_addr = (Address)evt.getArg();
                break;
            }
            case 6: {
                this.handleView((View)evt.getArg());
            }
        }
        return this.down_prot.down(evt);
    }

    public Object up(Event evt) {
        if (evt.getType() == 6) {
            this.handleView((View)evt.getArg());
        }
        return this.up_prot.up(evt);
    }

    public Object up(Message msg) {
        RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
        if (hdr != null) {
            this.handleEvent(msg, hdr);
            return null;
        }
        return this.up_prot.up(msg);
    }

    public void up(MessageBatch batch) {
        for (Message msg : batch) {
            RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
            if (hdr == null) continue;
            batch.remove(msg);
            this.handleEvent(msg, hdr);
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    protected void handleView(View v) {
        if (this.role == Role.Leader) {
            if (v != null && v.size() < this.raft.majority()) {
                this.changeRole(Role.Candidate);
            }
        } else if (v != null && v.size() < this.raft.majority()) {
            this.raft.leader(null);
        }
    }

    protected void handleEvent(Message msg, RaftHeader hdr) {
        VoteResponse rsp;
        int rc = this.raft.currentTerm(hdr.term);
        if (rc < 0) {
            return;
        }
        if (rc > 0) {
            this.changeRole(Role.Follower);
            this.voteFor(null);
        }
        if (hdr instanceof HeartbeatRequest) {
            HeartbeatRequest hb = (HeartbeatRequest)hdr;
            this.handleHeartbeat(hb.term(), hb.leader);
        } else if (hdr instanceof VoteRequest) {
            VoteRequest header = (VoteRequest)hdr;
            this.handleVoteRequest(msg.src(), header.term(), header.lastLogTerm(), header.lastLogIndex());
        } else if (hdr instanceof VoteResponse && (rsp = (VoteResponse)hdr).result()) {
            this.handleVoteResponse(rsp.term());
        }
    }

    protected synchronized void handleHeartbeat(int term, Address leader) {
        if (Objects.equals(this.local_addr, leader)) {
            return;
        }
        this.heartbeatReceived(true);
        if (this.role != Role.Follower || this.raft.updateTermAndLeader(term, leader)) {
            this.changeRole(Role.Follower);
            this.voteFor(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void handleVoteRequest(Address sender, int term, int last_log_term, int last_log_index) {
        if (this.log.isTraceEnabled()) {
            this.log.trace("%s: received VoteRequest from %s: term=%d, my term=%d, last_log_term=%d, last_log_index=%d", new Object[]{this.local_addr, sender, term, this.raft.currentTerm(), last_log_term, last_log_index});
        }
        boolean send_vote_rsp = false;
        ELECTION eLECTION = this;
        synchronized (eLECTION) {
            if (this.voteFor(sender)) {
                if (this.sameOrNewer(last_log_term, last_log_index)) {
                    send_vote_rsp = true;
                } else {
                    this.log.trace("%s: dropped VoteRequest from %s as my log is more up-to-date", new Object[]{this.local_addr, sender});
                }
            } else {
                this.log.trace("%s: already voted for %s in term %d; skipping vote", new Object[]{this.local_addr, sender, term});
            }
        }
        if (send_vote_rsp) {
            this.sendVoteResponse(sender, term);
        }
    }

    protected synchronized void handleVoteResponse(int term) {
        if (this.role == Role.Candidate && term == this.raft.current_term && ++this.current_votes >= this.raft.majority) {
            this.log.trace("%s: collected %d votes (majority=%d) in term %d -> becoming leader", new Object[]{this.local_addr, this.current_votes, this.raft.majority, term});
            this.changeRole(Role.Leader);
        }
    }

    protected synchronized void handleElectionTimeout() {
        this.log.trace("%s: election timeout", new Object[]{this.local_addr});
        switch (this.role) {
            case Follower: {
                this.changeRole(Role.Candidate);
                this.startElection();
                break;
            }
            case Candidate: {
                this.startElection();
            }
        }
    }

    protected boolean sameOrNewer(int last_log_term, int last_log_index) {
        int my_last_log_index = this.raft.log().lastAppended();
        LogEntry entry = this.raft.log().get(my_last_log_index);
        int my_last_log_term = entry != null ? entry.term : 0;
        int comp = Integer.compare(my_last_log_term, last_log_term);
        return comp <= 0 && (comp < 0 || my_last_log_index <= last_log_index);
    }

    protected synchronized boolean heartbeatReceived(boolean flag) {
        boolean retval = this.heartbeat_received;
        this.heartbeat_received = flag;
        return retval;
    }

    protected void sendHeartbeat(int term, Address leader) {
        Message req = new Message(null).putHeader(this.id, (Header)new HeartbeatRequest(term, leader)).setFlag(new Message.Flag[]{Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.NO_RELIABILITY, Message.Flag.NO_FC}).setTransientFlag(new Message.TransientFlag[]{Message.TransientFlag.DONT_LOOPBACK});
        this.down_prot.down(req);
    }

    protected void sendVoteRequest(int term) {
        int last_log_index = this.raft.log().lastAppended();
        LogEntry entry = this.raft.log().get(last_log_index);
        int last_log_term = entry != null ? entry.term() : 0;
        VoteRequest req = new VoteRequest(term, last_log_term, last_log_index);
        this.log.trace("%s: sending %s", new Object[]{this.local_addr, req});
        Message vote_req = new Message(null).putHeader(this.id, (Header)req).setFlag(new Message.Flag[]{Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.NO_RELIABILITY, Message.Flag.NO_FC});
        this.down_prot.down(vote_req);
    }

    protected void sendVoteResponse(Address dest, int term) {
        VoteResponse rsp = new VoteResponse(term, true);
        this.log.trace("%s: sending %s", new Object[]{this.local_addr, rsp});
        Message vote_rsp = new Message(dest).putHeader(this.id, (Header)rsp).setFlag(new Message.Flag[]{Message.Flag.OOB, Message.Flag.INTERNAL, Message.Flag.NO_RELIABILITY, Message.Flag.NO_FC});
        this.down_prot.down(vote_rsp);
    }

    protected void changeRole(Role new_role) {
        if (this.role == new_role) {
            return;
        }
        if (this.role != Role.Leader && new_role == Role.Leader) {
            this.raft.leader(this.local_addr);
            this.sendHeartbeat(this.raft.currentTerm(), this.raft.leader());
            this.stopElectionTimer();
            this.startHeartbeatTimer();
        } else if (this.role == Role.Leader && new_role != Role.Leader) {
            this.stopHeartbeatTimer();
            this.startElectionTimer();
            this.raft.leader(null);
        }
        this.role = new_role;
        this.raft.changeRole(this.role);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void startElection() {
        int new_term = 0;
        ELECTION eLECTION = this;
        synchronized (eLECTION) {
            new_term = this.raft.createNewTerm();
            this.voteFor(null);
            this.current_votes = 0;
            if (!this.voteFor(this.local_addr)) {
                return;
            }
        }
        this.sendVoteRequest(new_term);
    }

    @ManagedAttribute(description="Vote cast for a candidate in the current term")
    public synchronized String votedFor() {
        return this.voted_for != null ? this.voted_for.toString() : null;
    }

    protected boolean voteFor(Address addr) {
        if (addr == null) {
            this.voted_for = null;
            return true;
        }
        if (this.voted_for == null) {
            this.voted_for = addr;
            return true;
        }
        return this.voted_for.equals(addr);
    }

    protected void startElectionTimer() {
        if (!this.no_elections && (this.election_task == null || this.election_task.isDone())) {
            this.election_task = this.timer.scheduleWithDynamicInterval((TimeScheduler.Task)new ElectionTask());
        }
    }

    protected void stopElectionTimer() {
        if (this.election_task != null) {
            this.election_task.cancel(true);
        }
    }

    protected void startHeartbeatTimer() {
        if (this.heartbeat_task == null || this.heartbeat_task.isDone()) {
            this.heartbeat_task = this.timer.scheduleAtFixedRate((Runnable)new HeartbeatTask(), this.heartbeat_interval, this.heartbeat_interval, TimeUnit.MILLISECONDS);
        }
    }

    protected void stopHeartbeatTimer() {
        if (this.heartbeat_task != null) {
            this.heartbeat_task.cancel(true);
        }
    }

    protected <T extends Protocol> T findProtocol(Class<T> clazz) {
        for (Protocol p = this.up_prot; p != null; p = p.getUpProtocol()) {
            if (!clazz.isAssignableFrom(p.getClass())) continue;
            return (T)p;
        }
        throw new IllegalStateException(clazz.getSimpleName() + " not found above " + ((Object)((Object)this)).getClass().getSimpleName());
    }

    static {
        ClassConfigurator.addProtocol((short)520, ELECTION.class);
        ClassConfigurator.add((short)3000, VoteRequest.class);
        ClassConfigurator.add((short)3001, VoteResponse.class);
        ClassConfigurator.add((short)3002, HeartbeatRequest.class);
    }

    protected class HeartbeatTask
    implements Runnable {
        protected HeartbeatTask() {
        }

        @Override
        public void run() {
            ELECTION.this.sendHeartbeat(ELECTION.this.raft.currentTerm(), ELECTION.this.raft.leader());
        }
    }

    protected class ElectionTask
    implements TimeScheduler.Task {
        protected ElectionTask() {
        }

        public long nextInterval() {
            return this.computeElectionTimeout(ELECTION.this.election_min_interval, ELECTION.this.election_max_interval);
        }

        public void run() {
            if (!ELECTION.this.heartbeatReceived(false)) {
                ELECTION.this.handleElectionTimeout();
            }
        }

        protected long computeElectionTimeout(long min, long max) {
            long diff = max - min;
            return (long)((int)(Math.random() * 100000.0 % (double)diff)) + min;
        }
    }
}

