/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.throttling;

import java.util.LinkedHashSet;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.NonManagedService;
import org.apache.camel.Route;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.Configurer;
import org.apache.camel.spi.Metadata;
import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.support.RoutePolicySupport;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Metadata(label="bean", description="A throttle based RoutePolicy which is capable of dynamic throttling a route based on number of current inflight exchanges.", annotations={"interfaceName=org.apache.camel.spi.RoutePolicy"})
@Configurer(metadataOnly=true)
public class ThrottlingInflightRoutePolicy
extends RoutePolicySupport
implements CamelContextAware {
    private static final Logger LOG = LoggerFactory.getLogger(ThrottlingInflightRoutePolicy.class);
    private final Set<Route> routes = new LinkedHashSet<Route>();
    private ContextScopedEventNotifier eventNotifier;
    private CamelContext camelContext;
    private final Lock lock = new ReentrantLock();
    @Metadata(description="Sets which scope the throttling should be based upon, either route or total scoped.", enums="Context,Route", defaultValue="Route")
    private ThrottlingScope scope = ThrottlingScope.Route;
    @Metadata(description="Sets the upper limit of number of concurrent inflight exchanges at which point reached the throttler should suspend the route.", defaultValue="1000")
    private int maxInflightExchanges = 1000;
    @Metadata(description="Sets at which percentage of the max the throttler should start resuming the route.", defaultValue="70")
    private int resumePercentOfMax = 70;
    private int resumeInflightExchanges = 700;
    @Metadata(description="Sets the logging level to report the throttling activity.", javaType="org.apache.camel.LoggingLevel", defaultValue="INFO", enums="TRACE,DEBUG,INFO,WARN,ERROR,OFF")
    private LoggingLevel loggingLevel = LoggingLevel.INFO;
    private CamelLogger logger;

    public String toString() {
        return "ThrottlingInflightRoutePolicy[" + this.maxInflightExchanges + " / " + this.resumePercentOfMax + "% using scope " + this.scope + "]";
    }

    @Override
    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    @Override
    public void setCamelContext(CamelContext camelContext) {
        this.camelContext = camelContext;
    }

    @Override
    public void onInit(Route route) {
        this.routes.add(route);
    }

    @Override
    public void onExchangeDone(Route route, Exchange exchange) {
        if (this.scope == ThrottlingScope.Route) {
            this.throttle(route, exchange);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void throttle(Route route, Exchange exchange) {
        boolean start;
        boolean stop;
        Consumer consumer = route.getConsumer();
        int size = this.getSize(route, exchange);
        boolean bl = stop = this.maxInflightExchanges > 0 && size > this.maxInflightExchanges;
        if (LOG.isTraceEnabled()) {
            LOG.trace("{} > 0 && {} > {} evaluated as {}", this.maxInflightExchanges, size, this.maxInflightExchanges, stop);
        }
        if (stop) {
            try {
                this.lock.lock();
                this.stopConsumer(size, consumer);
            }
            catch (Exception e) {
                this.handleException(e);
            }
            finally {
                this.lock.unlock();
            }
        }
        boolean bl2 = start = (size = this.getSize(route, exchange)) <= this.resumeInflightExchanges;
        if (LOG.isTraceEnabled()) {
            LOG.trace("{} <= {} evaluated as {}", size, this.resumeInflightExchanges, start);
        }
        if (start) {
            try {
                this.lock.lock();
                this.startConsumer(size, consumer);
            }
            catch (Exception e) {
                this.handleException(e);
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    public int getMaxInflightExchanges() {
        return this.maxInflightExchanges;
    }

    public void setMaxInflightExchanges(int maxInflightExchanges) {
        this.maxInflightExchanges = maxInflightExchanges;
        this.resumeInflightExchanges = Math.max(this.resumePercentOfMax * maxInflightExchanges / 100, 1);
    }

    public int getResumePercentOfMax() {
        return this.resumePercentOfMax;
    }

    public void setResumePercentOfMax(int resumePercentOfMax) {
        if (resumePercentOfMax < 0 || resumePercentOfMax > 100) {
            throw new IllegalArgumentException("Must be a percentage between 0 and 100, was: " + resumePercentOfMax);
        }
        this.resumePercentOfMax = resumePercentOfMax;
        this.resumeInflightExchanges = Math.max(resumePercentOfMax * this.maxInflightExchanges / 100, 1);
    }

    public ThrottlingScope getScope() {
        return this.scope;
    }

    public void setScope(ThrottlingScope scope) {
        this.scope = scope;
    }

    public LoggingLevel getLoggingLevel() {
        return this.loggingLevel;
    }

    public CamelLogger getLogger() {
        if (this.logger == null) {
            this.logger = this.createLogger();
        }
        return this.logger;
    }

    public void setLogger(CamelLogger logger2) {
        this.logger = logger2;
    }

    public void setLoggingLevel(LoggingLevel loggingLevel) {
        this.loggingLevel = loggingLevel;
    }

    protected CamelLogger createLogger() {
        return new CamelLogger(LOG, this.getLoggingLevel());
    }

    private int getSize(Route route, Exchange exchange) {
        if (this.scope == ThrottlingScope.Context) {
            return exchange.getContext().getInflightRepository().size();
        }
        return exchange.getContext().getInflightRepository().size(route.getId());
    }

    private void startConsumer(int size, Consumer consumer) throws Exception {
        boolean started = this.resumeOrStartConsumer(consumer);
        if (started) {
            this.getLogger().log("Throttling consumer: " + size + " <= " + this.resumeInflightExchanges + " inflight exchange by resuming consumer: " + consumer);
        }
    }

    private void stopConsumer(int size, Consumer consumer) throws Exception {
        boolean stopped = this.suspendOrStopConsumer(consumer);
        if (stopped) {
            this.getLogger().log("Throttling consumer: " + size + " > " + this.maxInflightExchanges + " inflight exchange by suspending consumer: " + consumer);
        }
    }

    @Override
    protected void doStart() throws Exception {
        ObjectHelper.notNull(this.camelContext, "CamelContext", this);
        if (this.scope == ThrottlingScope.Context) {
            this.eventNotifier = new ContextScopedEventNotifier();
            ServiceHelper.startService(this.eventNotifier);
            this.camelContext.getManagementStrategy().addEventNotifier(this.eventNotifier);
        }
    }

    @Override
    protected void doStop() throws Exception {
        ObjectHelper.notNull(this.camelContext, "CamelContext", this);
        if (this.scope == ThrottlingScope.Context) {
            this.camelContext.getManagementStrategy().removeEventNotifier(this.eventNotifier);
        }
    }

    public static enum ThrottlingScope {
        Context,
        Route;

    }

    private class ContextScopedEventNotifier
    extends EventNotifierSupport
    implements NonManagedService {
        private ContextScopedEventNotifier() {
        }

        @Override
        public void notify(CamelEvent event) {
            CamelEvent.ExchangeCompletedEvent completedEvent = (CamelEvent.ExchangeCompletedEvent)event;
            for (Route route : ThrottlingInflightRoutePolicy.this.routes) {
                ThrottlingInflightRoutePolicy.this.throttle(route, completedEvent.getExchange());
            }
        }

        @Override
        public boolean isEnabled(CamelEvent event) {
            return event instanceof CamelEvent.ExchangeCompletedEvent;
        }

        public String toString() {
            return "ContextScopedEventNotifier";
        }
    }
}

