/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.raft.election;

import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import org.jgroups.Address;
import org.jgroups.EmptyMessage;
import org.jgroups.Event;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.protocols.raft.Log;
import org.jgroups.protocols.raft.LogEntry;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.RaftHeader;
import org.jgroups.protocols.raft.election.LeaderElected;
import org.jgroups.protocols.raft.election.VoteRequest;
import org.jgroups.protocols.raft.election.VoteResponse;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.ResponseCollector;
import org.jgroups.util.Runner;

public abstract class BaseElection
extends Protocol {
    protected static final short VOTE_REQ = 3000;
    static final short VOTE_RSP = 3001;
    static final short LEADER_ELECTED = 3005;
    protected static final short PRE_VOTE_REQ = 3006;
    protected static final short PRE_VOTE_RSP = 3007;
    protected RAFT raft;
    private final Runner voting_thread = new Runner("voting-thread", this::runVotingProcess, null);
    private final ResponseCollector<VoteResponse> votes = new ResponseCollector();
    protected volatile View view;
    @Property(description="Max time (ms) to wait for vote responses", type=AttributeType.TIME)
    protected long vote_timeout = 600L;
    @ManagedAttribute(description="Number of voting rounds initiated by the coordinator")
    protected int num_voting_rounds;

    public long voteTimeout() {
        return this.vote_timeout;
    }

    public BaseElection voteTimeout(long timeoutMs) {
        if (timeoutMs <= 0L) {
            throw new IllegalArgumentException("Timeout should be greater than 0.");
        }
        this.vote_timeout = timeoutMs;
        return this;
    }

    public RAFT raft() {
        return this.raft;
    }

    public BaseElection raft(RAFT r) {
        this.raft = r;
        return this;
    }

    public ResponseCollector<VoteResponse> getVotes() {
        return this.votes;
    }

    @ManagedAttribute(description="Is the voting thread (only on the coordinator) running?")
    public synchronized boolean isVotingThreadRunning() {
        return this.voting_thread.isRunning();
    }

    @ManagedOperation(description="Trigger the voting process (only on the coordinator)")
    public boolean runVotingThread() {
        if (this.isViewCoordinator()) {
            this.startVotingThread();
            return this.isVotingThreadRunning();
        }
        return false;
    }

    protected abstract void handleView(View var1);

    @Override
    public void resetStats() {
        super.resetStats();
        this.num_voting_rounds = 0;
    }

    @Override
    public void init() throws Exception {
        super.init();
        this.raft = RAFT.findProtocol(RAFT.class, this, false);
    }

    @Override
    public void stop() {
        this.stopVotingThread();
        if (this.raft != null) {
            this.raft.setLeaderAndTerm(null);
        }
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 4: {
                this.raft.setLeaderAndTerm(null);
                break;
            }
            case 6: {
                this.handleView((View)evt.getArg());
            }
        }
        return this.down_prot.down(evt);
    }

    @Override
    public Object up(Event evt) {
        if (evt.getType() == 6) {
            this.handleView((View)evt.getArg());
        }
        return this.up_prot.up(evt);
    }

    @Override
    public Object up(Message msg) {
        RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
        if (hdr != null) {
            this.handleMessage(msg, hdr);
            return null;
        }
        return this.up_prot.up(msg);
    }

    @Override
    public void up(MessageBatch batch) {
        Iterator<Message> it = batch.iterator();
        while (it.hasNext()) {
            Message msg = it.next();
            RaftHeader hdr = (RaftHeader)msg.getHeader(this.id);
            if (hdr == null) continue;
            it.remove();
            this.handleMessage(msg, hdr);
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    protected void handleMessage(Message msg, RaftHeader hdr) {
        if (hdr instanceof LeaderElected) {
            this.handleLeaderElected(msg, (LeaderElected)hdr);
            return;
        }
        if (hdr instanceof VoteRequest) {
            this.handleVoteRequest(msg, (VoteRequest)hdr);
            return;
        }
        if (hdr instanceof VoteResponse) {
            this.handleVoteResponse(msg, (VoteResponse)hdr);
        }
    }

    protected boolean isViewCoordinator() {
        return Objects.equals(this.view.getCoord(), this.local_addr);
    }

    private void handleLeaderElected(Message msg, LeaderElected hdr) {
        long term = hdr.currTerm();
        Address leader = hdr.leader();
        this.stopVotingThread();
        this.log.trace("%s <- %s: %s", this.local_addr, msg.src(), hdr);
        this.raft.setLeaderAndTerm(leader, term);
    }

    private void handleVoteRequest(Message msg, VoteRequest hdr) {
        Address sender = msg.src();
        long new_term = hdr.currTerm();
        if (this.log.isTraceEnabled()) {
            this.log.trace("%s <- %s: VoteRequest(term=%d)", this.local_addr, sender, new_term);
        }
        int result = this.raft.currentTerm(new_term);
        switch (result) {
            case -1: {
                this.log.trace("%s: received vote request from %s with term=%d (current_term=%d); dropping vote response", this.local_addr, sender, new_term, this.raft.currentTerm());
                return;
            }
            case 1: {
                this.log.trace("%s: received vote request from %s with higher term %d; accepting request", this.local_addr, sender, new_term);
                break;
            }
            case 0: {
                this.log.trace("%s: received vote request from %s at same term %d", this.local_addr, sender, new_term);
            }
        }
        Address voted_for = this.raft.votedFor();
        if (voted_for != null && !voted_for.equals(sender)) {
            this.log.trace("%s: already voted (for %s) in term %d; dropping vote request from %s", this.local_addr, voted_for, new_term, sender);
            return;
        }
        Log log_impl = this.raft.log();
        if (log_impl == null) {
            return;
        }
        this.raft.votedFor(sender);
        long my_last_index = log_impl.lastAppended();
        LogEntry entry = log_impl.get(my_last_index);
        long my_last_term = entry != null ? entry.term() : 0L;
        this.sendVoteResponse(sender, new_term, my_last_term, my_last_index);
    }

    private void handleVoteResponse(Message msg, VoteResponse hdr) {
        this.votes.add(msg.src(), hdr);
    }

    protected Address determineLeader() {
        Address leader = null;
        VoteResponse higher = null;
        Map<Address, VoteResponse> results = this.votes.getResults();
        for (Address mbr : this.view.getMembersRaw()) {
            VoteResponse rsp = results.get(mbr);
            if (rsp == null || !this.isHigher(higher, rsp)) continue;
            leader = mbr;
            higher = rsp;
        }
        return leader;
    }

    private boolean isHigher(VoteResponse one, VoteResponse other) {
        if (one == null) {
            return true;
        }
        if (one.last_log_term < other.last_log_term) {
            return true;
        }
        if (one.last_log_term > other.last_log_term) {
            return false;
        }
        return one.last_log_index < other.last_log_index;
    }

    protected void runVotingProcess() {
        if (!this.isMajorityAvailable()) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("%s: majority (%d) not available anymore (%s), stopping thread", this.local_addr, this.raft.majority(), this.view);
            }
            this.stopVotingThread();
            return;
        }
        long new_term = this.raft.createNewTerm();
        this.votes.reset(this.view.getMembersRaw());
        ++this.num_voting_rounds;
        long start = System.currentTimeMillis();
        this.sendVoteRequest(new_term);
        this.votes.waitForAllResponses(this.vote_timeout);
        long time = System.currentTimeMillis() - start;
        int majority = this.raft.majority();
        if (this.votes.numberOfValidResponses() >= majority) {
            Address leader = this.determineLeader();
            this.log.trace("%s: collected votes from %s in %d ms (majority=%d) -> leader is %s (new_term=%d)", this.local_addr, this.votes.getResults(), time, majority, leader, new_term);
            this.raft.setLeaderAndTerm(leader, new_term);
            this.sendLeaderElectedMessage(leader, new_term);
            this.stopVotingThread();
        } else {
            this.log.trace("%s: collected votes from %s in %d ms (majority=%d); starting another voting round", this.local_addr, this.votes.getValidResults(), time, majority);
        }
    }

    protected final boolean isMajorityAvailable() {
        return this.view != null && this.view.size() >= this.raft.majority();
    }

    public synchronized BaseElection startVotingThread() {
        if (!this.isVotingThreadRunning()) {
            this.voting_thread.start();
        }
        return this;
    }

    protected void sendVoteRequest(long new_term) {
        VoteRequest req = new VoteRequest(new_term);
        this.log.trace("%s -> all: %s", this.local_addr, req);
        Message vote_req = new EmptyMessage(null).putHeader(this.id, req).setFlag(Message.Flag.OOB);
        this.down_prot.down(vote_req);
    }

    protected void sendLeaderElectedMessage(Address leader, long term) {
        RaftHeader hdr = new LeaderElected(leader).currTerm(term);
        Message msg = new EmptyMessage(null).putHeader(this.id, hdr).setFlag(Message.TransientFlag.DONT_LOOPBACK);
        this.log.trace("%s -> all (-self): %s", this.local_addr, hdr);
        this.down_prot.down(msg);
    }

    protected void sendVoteResponse(Address dest, long term, long last_log_term, long last_log_index) {
        VoteResponse rsp = new VoteResponse(term, last_log_term, last_log_index);
        Message vote_rsp = new EmptyMessage(dest).putHeader(this.id, rsp).setFlag(Message.Flag.OOB);
        this.log.trace("%s -> %s: %s", this.local_addr, dest, rsp);
        this.down_prot.down(vote_rsp);
    }

    public synchronized BaseElection stopVotingThread() {
        if (this.isVotingThreadRunning()) {
            this.log.debug("%s: stopping the voting thread", this.local_addr);
            this.voting_thread.stop();
            this.votes.reset();
        }
        return this;
    }

    static {
        ClassConfigurator.add((short)3000, VoteRequest.class);
        ClassConfigurator.add((short)3001, VoteResponse.class);
        ClassConfigurator.add((short)3005, LeaderElected.class);
    }
}

