/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.processor.AbstractThrottler;
import org.apache.camel.processor.ThrottlerRejectedExecutionException;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TotalRequestsThrottler
extends AbstractThrottler {
    private static final Logger LOG = LoggerFactory.getLogger(TotalRequestsThrottler.class);
    private long timePeriodMillis;
    private final long cleanPeriodMillis;
    private final Expression correlationExpression;
    private final Map<String, ThrottlingState> states = new ConcurrentHashMap<String, ThrottlingState>();

    public TotalRequestsThrottler(CamelContext camelContext, Expression maxRequestsExpression, long timePeriodMillis, ScheduledExecutorService asyncExecutor, boolean shutdownAsyncExecutor, boolean rejectExecution, Expression correlation) {
        super(asyncExecutor, shutdownAsyncExecutor, camelContext, rejectExecution, correlation, maxRequestsExpression);
        if (timePeriodMillis <= 0L) {
            throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis);
        }
        this.timePeriodMillis = timePeriodMillis;
        this.cleanPeriodMillis = timePeriodMillis * 10L;
        this.correlationExpression = correlation;
    }

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        long queuedStart = 0L;
        if (LOG.isTraceEnabled()) {
            queuedStart = exchange.getProperty("CamelThrottlerExchangeQueuedTimestamp", (Object)0L, Long.class);
            exchange.removeProperty("CamelThrottlerExchangeQueuedTimestamp");
        }
        AbstractThrottler.State state = exchange.getProperty("CamelThrottlerExchangeState", (Object)AbstractThrottler.State.SYNC, AbstractThrottler.State.class);
        exchange.removeProperty("CamelThrottlerExchangeState");
        boolean doneSync = state == AbstractThrottler.State.SYNC || state == AbstractThrottler.State.ASYNC_REJECTED;
        try {
            if (!this.isRunAllowed()) {
                throw new RejectedExecutionException("Run is not allowed");
            }
            String key = "CamelThrottlerDefaultKey";
            if (this.correlationExpression != null) {
                key = this.correlationExpression.evaluate(exchange, String.class);
            }
            ThrottlingState throttlingState = this.states.computeIfAbsent(key, x$0 -> new ThrottlingState((String)x$0));
            throttlingState.calculateAndSetMaxRequestsPerPeriod(exchange);
            ThrottlePermit permit = throttlingState.poll();
            if (permit == null) {
                if (this.isRejectExecution()) {
                    throw new ThrottlerRejectedExecutionException("Exceeded the max throttle rate of " + throttlingState.getThrottleRate() + " within " + this.timePeriodMillis + "ms");
                }
                if (this.isAsyncDelayed() && !exchange.isTransacted() && state == AbstractThrottler.State.SYNC) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", (Object)exchange.getExchangeId());
                    }
                    return this.processAsynchronously(exchange, callback, throttlingState);
                }
                long start = 0L;
                long elapsed = 0L;
                if (LOG.isTraceEnabled()) {
                    start = System.currentTimeMillis();
                }
                permit = throttlingState.take();
                if (LOG.isTraceEnabled()) {
                    elapsed = System.currentTimeMillis() - start;
                }
                throttlingState.enqueue(permit, exchange);
                if (state == AbstractThrottler.State.ASYNC) {
                    if (LOG.isTraceEnabled()) {
                        long queuedTime = start - queuedStart;
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", queuedTime, elapsed, exchange.getExchangeId());
                        }
                    }
                } else if (LOG.isTraceEnabled()) {
                    LOG.trace("Throttled for {}ms, exchangeId: {}", (Object)elapsed, (Object)exchange.getExchangeId());
                }
            } else {
                throttlingState.enqueue(permit, exchange);
                if (state == AbstractThrottler.State.ASYNC) {
                    if (LOG.isTraceEnabled()) {
                        long queuedTime = System.currentTimeMillis() - queuedStart;
                        LOG.trace("Queued for {}ms, No throttling applied (throttle cleared while queued), for exchangeId: {}", (Object)queuedTime, (Object)exchange.getExchangeId());
                    }
                } else if (LOG.isTraceEnabled()) {
                    LOG.trace("No throttling applied to exchangeId: {}", (Object)exchange.getExchangeId());
                }
            }
            callback.done(doneSync);
            return doneSync;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return TotalRequestsThrottler.handleInterrupt(exchange, callback, e, doneSync);
        }
        catch (Exception e) {
            return TotalRequestsThrottler.handleException(exchange, callback, e, doneSync);
        }
    }

    protected boolean processAsynchronously(Exchange exchange, AsyncCallback callback, ThrottlingState throttlingState) {
        try {
            if (LOG.isTraceEnabled()) {
                exchange.setProperty("CamelThrottlerExchangeQueuedTimestamp", (Object)System.nanoTime());
            }
            exchange.setProperty("CamelThrottlerExchangeState", (Object)AbstractThrottler.State.ASYNC);
            long delay = throttlingState.peek().getDelay(TimeUnit.NANOSECONDS);
            this.asyncExecutor.schedule(() -> this.process(exchange, callback), delay, TimeUnit.NANOSECONDS);
            return false;
        }
        catch (RejectedExecutionException e) {
            if (this.isCallerRunsWhenRejected()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("AsyncExecutor is full, rejected exchange will run in the current thread, exchangeId: {}", (Object)exchange.getExchangeId());
                }
                exchange.setProperty("CamelThrottlerExchangeState", (Object)AbstractThrottler.State.ASYNC_REJECTED);
                return this.process(exchange, callback);
            }
            throw e;
        }
    }

    @Override
    protected void doStart() throws Exception {
        if (this.isAsyncDelayed()) {
            ObjectHelper.notNull(this.asyncExecutor, "executorService", this);
        }
    }

    @Override
    protected void doShutdown() throws Exception {
        if (this.shutdownAsyncExecutor && this.asyncExecutor != null) {
            this.camelContext.getExecutorServiceManager().shutdownNow(this.asyncExecutor);
        }
        this.states.clear();
        super.doShutdown();
    }

    @Override
    public String getMode() {
        return "TotalRequests";
    }

    @Override
    public int getCurrentMaximumRequests() {
        return this.states.values().stream().mapToInt(ThrottlingState::getThrottleRate).max().orElse(0);
    }

    public void setTimePeriodMillis(long timePeriodMillis) {
        this.timePeriodMillis = timePeriodMillis;
    }

    public long getTimePeriodMillis() {
        return this.timePeriodMillis;
    }

    @Override
    public String getTraceLabel() {
        return "throttle[" + this.getMaximumRequestsExpression() + " per: " + this.timePeriodMillis + "]";
    }

    public String toString() {
        return this.id;
    }

    protected class ThrottlingState {
        private final String key;
        private final Lock lock = new ReentrantLock();
        private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue();
        private final AtomicReference<ScheduledFuture<?>> cleanFuture = new AtomicReference();
        private volatile int throttleRate;

        ThrottlingState(String key) {
            this.key = key;
        }

        public int getThrottleRate() {
            return this.throttleRate;
        }

        public ThrottlePermit poll() {
            return (ThrottlePermit)this.delayQueue.poll();
        }

        public ThrottlePermit peek() {
            return (ThrottlePermit)this.delayQueue.peek();
        }

        public ThrottlePermit take() throws InterruptedException {
            return (ThrottlePermit)this.delayQueue.take();
        }

        public void clean() {
            TotalRequestsThrottler.this.states.remove(this.key);
        }

        public void enqueue(ThrottlePermit permit, Exchange exchange) {
            permit.setDelayMs(TotalRequestsThrottler.this.getTimePeriodMillis());
            this.delayQueue.put(permit);
            try {
                ScheduledFuture<?> next = TotalRequestsThrottler.this.asyncExecutor.schedule(this::clean, TotalRequestsThrottler.this.cleanPeriodMillis, TimeUnit.MILLISECONDS);
                ScheduledFuture<?> prev = this.cleanFuture.getAndSet(next);
                if (prev != null) {
                    prev.cancel(false);
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Permit released, for exchangeId: {}", (Object)exchange.getExchangeId());
                }
            }
            catch (RejectedExecutionException e) {
                LOG.debug("Throttling queue cleaning rejected", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void calculateAndSetMaxRequestsPerPeriod(Exchange exchange) throws Exception {
            this.lock.lock();
            try {
                Integer newThrottle = TotalRequestsThrottler.this.getMaximumRequestsExpression().evaluate(exchange, Integer.class);
                if (newThrottle != null && newThrottle < 0) {
                    throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle);
                }
                if (newThrottle == null && this.throttleRate == 0) {
                    throw new RuntimeExchangeException("The maxRequestsPerPeriodExpression was evaluated as null: " + TotalRequestsThrottler.this.getMaximumRequestsExpression(), exchange);
                }
                if (newThrottle != null && newThrottle != this.throttleRate) {
                    if (this.throttleRate > newThrottle) {
                        for (int delta = this.throttleRate - newThrottle; delta > 0; --delta) {
                            this.delayQueue.take();
                            if (!LOG.isTraceEnabled()) continue;
                            LOG.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", (Object)exchange.getExchangeId());
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", this.throttleRate, newThrottle, exchange.getExchangeId());
                        }
                    } else if (newThrottle > this.throttleRate) {
                        int delta = newThrottle - this.throttleRate;
                        for (int i = 0; i < delta; ++i) {
                            this.delayQueue.put(new ThrottlePermit(-1L));
                        }
                        if (this.throttleRate == 0) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", (Object)newThrottle, (Object)exchange.getExchangeId());
                            }
                        } else if (LOG.isDebugEnabled()) {
                            LOG.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", this.throttleRate, newThrottle, exchange.getExchangeId());
                        }
                    }
                    this.throttleRate = newThrottle;
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    private static class ThrottlePermit
    implements Delayed {
        private volatile long scheduledTime;

        ThrottlePermit(long delayMs) {
            this.setDelayMs(delayMs);
        }

        public void setDelayMs(long delayMs) {
            this.scheduledTime = System.currentTimeMillis() + delayMs;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.scheduledTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
        }
    }
}

