/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.master;

import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.StartupListener;
import org.apache.camel.SuspendableService;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
import org.apache.camel.cluster.CamelClusterEventListener;
import org.apache.camel.cluster.CamelClusterMember;
import org.apache.camel.cluster.CamelClusterService;
import org.apache.camel.cluster.CamelClusterView;
import org.apache.camel.component.master.MasterEndpoint;
import org.apache.camel.resume.ResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.resume.AdapterHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.backoff.BackOff;
import org.apache.camel.util.backoff.BackOffTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ManagedResource(description="Managed Master Consumer")
public class MasterConsumer
extends DefaultConsumer
implements ResumeAware<ResumeStrategy> {
    private static final Logger LOG = LoggerFactory.getLogger(MasterConsumer.class);
    private final CamelClusterService clusterService;
    private final MasterEndpoint masterEndpoint;
    private final Endpoint delegatedEndpoint;
    private final Processor processor;
    private final CamelClusterEventListener.Leadership leadershipListener;
    private volatile Consumer delegatedConsumer;
    private volatile CamelClusterView view;
    private ResumeStrategy resumeStrategy;

    public MasterConsumer(MasterEndpoint masterEndpoint, Processor processor, CamelClusterService clusterService) {
        super(masterEndpoint, processor);
        this.clusterService = clusterService;
        this.masterEndpoint = masterEndpoint;
        this.delegatedEndpoint = masterEndpoint.getEndpoint();
        this.processor = processor;
        this.leadershipListener = new LeadershipListener();
    }

    @Override
    public ResumeStrategy getResumeStrategy() {
        return this.resumeStrategy;
    }

    @Override
    public void setResumeStrategy(ResumeStrategy resumeStrategy) {
        this.resumeStrategy = resumeStrategy;
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        LOG.debug("Using ClusterService instance {} (id={}, type={})", new Object[]{this.clusterService, this.clusterService.getId(), this.clusterService.getClass().getName()});
        this.view = this.clusterService.getView(this.masterEndpoint.getNamespace());
        this.view.addEventListener(this.leadershipListener);
    }

    @Override
    protected void doStop() throws Exception {
        super.doStop();
        if (this.view != null) {
            this.view.removeEventListener(this.leadershipListener);
            this.clusterService.releaseView(this.view);
            this.view = null;
        }
        ServiceHelper.stopAndShutdownServices(this.delegatedConsumer, this.delegatedEndpoint);
        this.delegatedConsumer = null;
    }

    @Override
    protected void doResume() throws Exception {
        if (this.delegatedConsumer instanceof SuspendableService) {
            ((SuspendableService)((Object)this.delegatedConsumer)).resume();
        }
        super.doResume();
    }

    @Override
    protected void doSuspend() throws Exception {
        if (this.delegatedConsumer instanceof SuspendableService) {
            ((SuspendableService)((Object)this.delegatedConsumer)).suspend();
        }
        super.doSuspend();
    }

    @ManagedAttribute(description="Are we the master")
    public boolean isMaster() {
        return this.view != null && this.view.getLocalMember().isLeader();
    }

    private synchronized void onLeadershipTaken() throws Exception {
        if (!this.isRunAllowed()) {
            return;
        }
        if (this.delegatedConsumer != null) {
            return;
        }
        long delay = this.masterEndpoint.getComponent().getBackOffDelay();
        long max = this.masterEndpoint.getComponent().getBackOffMaxAttempts();
        BackOffTimer timer = new BackOffTimer(this.masterEndpoint.getComponent().getBackOffThreadPool());
        timer.schedule(BackOff.builder().delay(delay).maxAttempts(max).build(), task -> {
            LOG.info("Leadership taken. Attempt #{} to start consumer: {}", (Object)task.getCurrentAttempts(), (Object)this.delegatedEndpoint);
            Exception cause = null;
            try {
                if (this.delegatedConsumer == null) {
                    Consumer patt0$temp;
                    this.delegatedConsumer = this.delegatedEndpoint.createConsumer(this.processor);
                    if (this.delegatedConsumer instanceof StartupListener) {
                        this.getEndpoint().getCamelContext().addStartupListener((StartupListener)((Object)this.delegatedConsumer));
                    }
                    if ((patt0$temp = this.delegatedConsumer) instanceof ResumeAware) {
                        ResumeAware resumeAwareConsumer = (ResumeAware)((Object)patt0$temp);
                        if (this.resumeStrategy != null) {
                            LOG.debug("Setting up the resume adapter for the resume strategy in consumer");
                            ResumeAdapter resumeAdapter = AdapterHelper.eval(this.clusterService.getCamelContext(), resumeAwareConsumer, this.resumeStrategy);
                            this.resumeStrategy.setAdapter(resumeAdapter);
                            LOG.debug("Setting up the resume strategy for consumer");
                            resumeAwareConsumer.setResumeStrategy(this.resumeStrategy);
                        }
                    }
                }
                ServiceHelper.startService(this.delegatedEndpoint, this.delegatedConsumer);
            }
            catch (Exception e) {
                cause = e;
            }
            if (cause != null) {
                String message = "Leadership taken. Attempt #" + task.getCurrentAttempts() + " failed to start consumer due to: " + cause.getMessage();
                this.getExceptionHandler().handleException(message, cause);
                return true;
            }
            LOG.info("Leadership taken. Attempt #" + task.getCurrentAttempts() + " success. Consumer started: {}", (Object)this.delegatedEndpoint);
            return false;
        });
    }

    private synchronized void onLeadershipLost() {
        LOG.debug("Leadership lost. Stopping consumer: {}", (Object)this.delegatedEndpoint);
        try {
            ServiceHelper.stopAndShutdownServices(this.delegatedConsumer, this.delegatedEndpoint);
        }
        finally {
            this.delegatedConsumer = null;
        }
        LOG.info("Leadership lost. Consumer stopped: {}", (Object)this.delegatedEndpoint);
    }

    private final class LeadershipListener
    implements CamelClusterEventListener.Leadership {
        private LeadershipListener() {
        }

        @Override
        public void leadershipChanged(CamelClusterView view, CamelClusterMember leader) {
            if (!MasterConsumer.this.isRunAllowed()) {
                return;
            }
            if (view.getLocalMember().isLeader()) {
                try {
                    MasterConsumer.this.onLeadershipTaken();
                }
                catch (Exception e) {
                    MasterConsumer.this.getExceptionHandler().handleException("Error starting consumer while taking leadership", e);
                }
            } else if (MasterConsumer.this.delegatedConsumer != null) {
                try {
                    MasterConsumer.this.onLeadershipLost();
                }
                catch (Exception e) {
                    MasterConsumer.this.getExceptionHandler().handleException("Error stopping consumer while loosing leadership. This exception is ignored.", e);
                }
            }
        }
    }
}

