/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.clock;

import java.net.InetAddress;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.clock.GridClockDeltaSnapshot;
import org.apache.ignite.internal.processors.clock.GridClockDeltaSnapshotMessage;
import org.apache.ignite.internal.processors.clock.GridClockDeltaVersion;
import org.apache.ignite.internal.processors.clock.GridClockMessage;
import org.apache.ignite.internal.processors.clock.GridClockServer;
import org.apache.ignite.internal.processors.clock.GridClockSource;
import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.thread.IgniteThread;

public class GridClockSyncProcessor
extends GridProcessorAdapter {
    private static final int MAX_TIME_SYNC_HISTORY = 100;
    private GridClockServer srv;
    private GridSpinReadWriteLock rw = new GridSpinReadWriteLock();
    private volatile boolean stopping;
    private volatile TimeCoordinator timeCoord;
    private NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHist = new GridBoundedConcurrentOrderedMap<GridClockDeltaVersion, GridClockDeltaSnapshot>(100);
    private volatile T2<GridClockDeltaVersion, GridClockDeltaSnapshot> lastSnapshot;
    private GridClockSource clockSrc;

    public GridClockSyncProcessor(GridKernalContext ctx) {
        super(ctx);
    }

    @Override
    public void start() throws IgniteCheckedException {
        super.start();
        this.clockSrc = this.ctx.timeSource();
        this.srv = new GridClockServer();
        this.srv.start(this.ctx);
        this.ctx.io().addMessageListener(GridTopic.TOPIC_TIME_SYNC, new GridMessageListener(){

            @Override
            public void onMessage(UUID nodeId, Object msg) {
                assert (msg instanceof GridClockDeltaSnapshotMessage);
                GridClockDeltaSnapshotMessage msg0 = (GridClockDeltaSnapshotMessage)msg;
                GridClockDeltaVersion ver = msg0.snapshotVersion();
                GridClockDeltaSnapshot snap = new GridClockDeltaSnapshot(ver, msg0.deltas());
                GridClockSyncProcessor.this.lastSnapshot = new T2<GridClockDeltaVersion, GridClockDeltaSnapshot>(ver, snap);
                GridClockSyncProcessor.this.timeSyncHist.put(ver, snap);
            }
        });
        this.ctx.event().addLocalEventListener(new GridLocalEventListener(){

            @Override
            public void onEvent(Event evt) {
                TimeCoordinator timeCoord0;
                assert (evt.type() == 11 || evt.type() == 12 || evt.type() == 10);
                DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
                if (evt.type() == 11 || evt.type() == 12) {
                    GridClockSyncProcessor.this.checkLaunchCoordinator(discoEvt);
                }
                if ((timeCoord0 = GridClockSyncProcessor.this.timeCoord) != null) {
                    timeCoord0.onDiscoveryEvent(discoEvt);
                }
            }
        }, 11, 12, 10);
        this.ctx.addNodeAttribute("org.apache.ignite.time.host", this.srv.host());
        this.ctx.addNodeAttribute("org.apache.ignite.time.port", this.srv.port());
    }

    @Override
    public void onKernalStart() throws IgniteCheckedException {
        super.onKernalStart();
        this.srv.afterStart();
        DiscoveryEvent locJoinEvt = this.ctx.discovery().localJoinEvent();
        this.checkLaunchCoordinator(locJoinEvt);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onKernalStop(boolean cancel) {
        super.onKernalStop(cancel);
        this.rw.writeLock();
        try {
            this.stopping = false;
            if (this.timeCoord != null) {
                this.timeCoord.cancel();
                U.join(this.timeCoord, this.log);
                this.timeCoord = null;
            }
            if (this.srv != null) {
                this.srv.beforeStop();
            }
        }
        finally {
            this.rw.writeUnlock();
        }
    }

    @Override
    public void stop(boolean cancel) throws IgniteCheckedException {
        super.stop(cancel);
        if (this.srv != null) {
            this.srv.stop();
        }
    }

    private long currentTime() {
        return this.clockSrc.currentTimeMillis();
    }

    public NavigableMap<GridClockDeltaVersion, GridClockDeltaSnapshot> timeSyncHistory() {
        return this.timeSyncHist;
    }

    public void onMessageReceived(GridClockMessage msg, InetAddress addr, int port) {
        long rcvTs = this.currentTime();
        if (!msg.originatingNodeId().equals(this.ctx.localNodeId())) {
            msg.replyTimestamp(rcvTs);
            try {
                this.srv.sendPacket(msg, addr, port);
            }
            catch (IgniteCheckedException e) {
                U.error(this.log, "Failed to send time server reply to remote node: " + msg, e);
            }
        } else {
            this.timeCoord.onMessage(msg, rcvTs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkLaunchCoordinator(DiscoveryEvent discoEvt) {
        block11: {
            this.rw.readLock();
            try {
                if (this.stopping) {
                    return;
                }
                if (this.timeCoord != null) break block11;
                long minNodeOrder = Long.MAX_VALUE;
                Collection<ClusterNode> nodes = discoEvt.topologyNodes();
                for (ClusterNode node : nodes) {
                    if (node.order() >= minNodeOrder) continue;
                    minNodeOrder = node.order();
                }
                ClusterNode locNode = this.ctx.discovery().localNode();
                if (locNode.order() != minNodeOrder) break block11;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Detected local node to be the eldest node in topology, starting time coordinator thread [discoEvt=" + discoEvt + ", locNode=" + locNode + ']');
                }
                GridClockSyncProcessor gridClockSyncProcessor = this;
                synchronized (gridClockSyncProcessor) {
                    if (this.timeCoord == null && !this.stopping) {
                        this.timeCoord = new TimeCoordinator(discoEvt);
                        IgniteThread th = new IgniteThread(this.timeCoord);
                        th.setPriority(10);
                        th.start();
                    }
                }
            }
            finally {
                this.rw.readUnlock();
            }
        }
    }

    public long adjustedTime(long topVer) {
        Map.Entry<GridClockDeltaVersion, GridClockDeltaSnapshot> entry;
        T2<GridClockDeltaVersion, GridClockDeltaSnapshot> fastSnap = this.lastSnapshot;
        GridClockDeltaSnapshot snap = fastSnap != null && ((GridClockDeltaVersion)fastSnap.get1()).topologyVersion() == topVer ? (GridClockDeltaSnapshot)fastSnap.get2() : ((entry = this.timeSyncHistory().lowerEntry(new GridClockDeltaVersion(0L, topVer + 1L))) == null ? null : entry.getValue());
        long now = this.clockSrc.currentTimeMillis();
        if (snap == null) {
            return now;
        }
        Long delta = snap.deltas().get(this.ctx.localNodeId());
        if (delta == null) {
            delta = 0L;
        }
        return now + delta;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void publish(GridClockDeltaSnapshot snapshot, GridDiscoveryTopologySnapshot top) {
        if (!this.rw.tryReadLock()) {
            return;
        }
        try {
            this.lastSnapshot = new T2<GridClockDeltaVersion, GridClockDeltaSnapshot>(snapshot.version(), snapshot);
            this.timeSyncHist.put(snapshot.version(), snapshot);
            for (ClusterNode n : top.topologyNodes()) {
                GridClockDeltaSnapshotMessage msg = new GridClockDeltaSnapshotMessage(snapshot.version(), snapshot.deltas());
                try {
                    this.ctx.io().send(n, GridTopic.TOPIC_TIME_SYNC, (Message)msg, (byte)2);
                }
                catch (IgniteCheckedException e) {
                    if (this.ctx.discovery().pingNodeNoError(n.id())) {
                        U.error(this.log, "Failed to send time sync snapshot to remote node (did not leave grid?) [nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']');
                        continue;
                    }
                    if (!this.log.isDebugEnabled()) continue;
                    this.log.debug("Failed to send time sync snapshot to remote node (did not leave grid?) [nodeId=" + n.id() + ", msg=" + msg + ", err=" + e.getMessage() + ']');
                }
            }
        }
        finally {
            this.rw.readUnlock();
        }
    }

    private class TimeCoordinator
    extends GridWorker {
        private volatile GridDiscoveryTopologySnapshot lastSnapshot;
        private volatile GridClockDeltaSnapshot pendingSnapshot;
        private long verCnt;

        protected TimeCoordinator(DiscoveryEvent evt) {
            super(GridClockSyncProcessor.this.ctx.gridName(), "grid-time-coordinator", GridClockSyncProcessor.this.log);
            this.verCnt = 1L;
            this.lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
            while (!this.isCancelled()) {
                GridClockDeltaSnapshot snapshot;
                GridDiscoveryTopologySnapshot top = this.lastSnapshot;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Creating time sync snapshot for topology: " + top);
                }
                this.pendingSnapshot = snapshot = new GridClockDeltaSnapshot(new GridClockDeltaVersion(this.verCnt++, top.topologyVersion()), GridClockSyncProcessor.this.ctx.localNodeId(), top, GridClockSyncProcessor.this.ctx.config().getClockSyncSamples());
                while (!snapshot.ready()) {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Requesting time from remote nodes: " + snapshot.pendingNodeIds());
                    }
                    for (UUID nodeId : snapshot.pendingNodeIds()) {
                        this.requestTime(nodeId);
                    }
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Waiting for snapshot to be ready: " + snapshot);
                    }
                    snapshot.awaitReady(1000L);
                }
                this.pendingSnapshot = null;
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Collected time sync results: " + snapshot.deltas());
                }
                GridClockSyncProcessor.this.publish(snapshot, top);
                TimeCoordinator timeCoordinator = this;
                synchronized (timeCoordinator) {
                    if (top.topologyVersion() == this.lastSnapshot.topologyVersion()) {
                        this.wait(GridClockSyncProcessor.this.ctx.config().getClockSyncFrequency());
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onDiscoveryEvent(DiscoveryEvent evt) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Processing discovery event: " + evt);
            }
            if (evt.type() == 12 || evt.type() == 11) {
                this.onNodeLeft(evt.eventNode().id());
            }
            TimeCoordinator timeCoordinator = this;
            synchronized (timeCoordinator) {
                this.lastSnapshot = new GridDiscoveryTopologySnapshot(evt.topologyVersion(), evt.topologyNodes());
                this.notifyAll();
            }
        }

        private void onMessage(GridClockMessage msg, long rcvTs) {
            GridClockDeltaSnapshot curr = this.pendingSnapshot;
            if (curr != null) {
                long delta = (msg.originatingTimestamp() + rcvTs) / 2L - msg.replyTimestamp();
                boolean needMore = curr.onDeltaReceived(msg.targetNodeId(), delta);
                if (needMore) {
                    this.requestTime(msg.targetNodeId());
                }
            }
        }

        private void requestTime(UUID rmtNodeId) {
            ClusterNode node = GridClockSyncProcessor.this.ctx.discovery().node(rmtNodeId);
            if (node != null) {
                InetAddress addr = (InetAddress)node.attribute("org.apache.ignite.time.host");
                int port = (Integer)node.attribute("org.apache.ignite.time.port");
                try {
                    GridClockMessage req = new GridClockMessage(GridClockSyncProcessor.this.ctx.localNodeId(), rmtNodeId, GridClockSyncProcessor.this.currentTime(), 0L);
                    GridClockSyncProcessor.this.srv.sendPacket(req, addr, port);
                }
                catch (IgniteCheckedException e) {
                    LT.error(this.log, e, "Failed to send time request to remote node [rmtNodeId=" + rmtNodeId + ", addr=" + addr + ", port=" + port + ']');
                }
            } else {
                this.onNodeLeft(rmtNodeId);
            }
        }

        private void onNodeLeft(UUID nodeId) {
            GridClockDeltaSnapshot curr = this.pendingSnapshot;
            if (curr != null) {
                curr.onNodeLeft(nodeId);
            }
        }
    }
}

