/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.kubernetes.cluster.lock;

import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.CamelContext;
import org.apache.camel.Service;
import org.apache.camel.component.kubernetes.cluster.lock.KubernetesClusterEventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimedLeaderNotifier
implements Service {
    private static final Logger LOG = LoggerFactory.getLogger(TimedLeaderNotifier.class);
    private static final long FIXED_DELAY = 10L;
    private CamelContext camelContext;
    private KubernetesClusterEventHandler handler;
    private Lock lock = new ReentrantLock();
    private ScheduledExecutorService executor;
    private Optional<String> lastCommunicatedLeader = Optional.empty();
    private Set<String> lastCommunicatedMembers = Collections.emptySet();
    private Optional<String> currentLeader = Optional.empty();
    private Set<String> currentMembers;
    private Long timestamp;
    private Long lease;
    private long changeCounter;

    public TimedLeaderNotifier(CamelContext camelContext, KubernetesClusterEventHandler handler) {
        this.camelContext = Objects.requireNonNull(camelContext, "Camel context must be present");
        this.handler = Objects.requireNonNull(handler, "Handler must be present");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void refreshLeadership(Optional<String> leader, Long timestamp, Long lease, Set<String> members) {
        long version;
        Objects.requireNonNull(leader, "leader must be non null (use Optional.empty)");
        Objects.requireNonNull(members, "members must be non null (use empty set)");
        try {
            this.lock.lock();
            this.currentLeader = leader;
            this.currentMembers = members;
            this.timestamp = timestamp;
            this.lease = lease;
            version = ++this.changeCounter;
        }
        finally {
            this.lock.unlock();
        }
        LOG.debug("Updated leader to {} at version version {}", leader, (Object)version);
        this.executor.execute(() -> this.checkAndNotify(version));
        if (leader.isPresent()) {
            long time = System.currentTimeMillis();
            long delay = Math.max(timestamp + lease + 10L - time, 10L);
            LOG.debug("Setting expiration in {} millis for version {}", (Object)delay, (Object)version);
            this.executor.schedule(() -> this.expiration(version), delay, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void start() {
        if (this.executor == null) {
            this.executor = this.camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "CamelKubernetesLeaderNotifier");
        }
    }

    @Override
    public void stop() {
        if (this.executor != null) {
            ScheduledExecutorService executor = this.executor;
            this.executor = null;
            executor.shutdownNow();
            try {
                executor.awaitTermination(1L, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                LOG.info("Interrupted while waiting for thread termination");
                Thread.currentThread().interrupt();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void expiration(long version) {
        try {
            this.lock.lock();
            if (version != this.changeCounter) {
                return;
            }
            long time = System.currentTimeMillis();
            if (time < this.timestamp + this.lease) {
                long delay = this.timestamp + this.lease - time;
                LOG.debug("Delaying expiration by {} millis at version version {}", (Object)(delay + 10L), (Object)version);
                this.executor.schedule(() -> this.expiration(version), delay + 10L, TimeUnit.MILLISECONDS);
                return;
            }
        }
        finally {
            this.lock.unlock();
        }
        this.checkAndNotify(version);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkAndNotify(long version) {
        Set<String> newMembers;
        Set<String> members;
        Optional<String> leader;
        try {
            long time;
            this.lock.lock();
            if (version != this.changeCounter) {
                return;
            }
            leader = this.currentLeader;
            members = this.currentMembers;
            if (leader.isPresent() && (time = System.currentTimeMillis()) >= this.timestamp + this.lease) {
                leader = Optional.empty();
            }
        }
        finally {
            this.lock.unlock();
        }
        Optional<String> newLeader = leader;
        if (!newLeader.equals(this.lastCommunicatedLeader)) {
            this.lastCommunicatedLeader = newLeader;
            LOG.info("The cluster has a new leader: {}", newLeader);
            try {
                this.handler.onKubernetesClusterEvent(() -> newLeader);
            }
            catch (Exception e) {
                LOG.warn("Error while communicating the new leader to the handler", (Throwable)e);
            }
        }
        if (!(newMembers = members).equals(this.lastCommunicatedMembers)) {
            this.lastCommunicatedMembers = newMembers;
            LOG.info("The list of cluster members has changed: {}", newMembers);
            try {
                this.handler.onKubernetesClusterEvent(() -> newMembers);
            }
            catch (Exception e) {
                LOG.warn("Error while communicating the cluster members to the handler", (Throwable)e);
            }
        }
    }
}

