/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.processor.AbstractThrottler;
import org.apache.camel.processor.ThrottlerRejectedExecutionException;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConcurrentRequestsThrottler
extends AbstractThrottler {
    private static final Logger LOG = LoggerFactory.getLogger(ConcurrentRequestsThrottler.class);
    private static final String DEFAULT_KEY = "CamelThrottlerDefaultKey";
    private static final String PROPERTY_EXCHANGE_QUEUED_TIME = "CamelThrottlerExchangeQueuedTime";
    private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState";
    private static final long CLEAN_PERIOD = 10000L;
    private final Map<String, ThrottlingState> states = new ConcurrentHashMap<String, ThrottlingState>();

    public ConcurrentRequestsThrottler(CamelContext camelContext, Expression maxRequestsExpression, ScheduledExecutorService asyncExecutor, boolean shutdownAsyncExecutor, boolean rejectExecution, Expression correlation) {
        super(asyncExecutor, shutdownAsyncExecutor, camelContext, rejectExecution, correlation, maxRequestsExpression);
    }

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        long queuedStart = 0L;
        if (LOG.isTraceEnabled()) {
            queuedStart = exchange.getProperty(PROPERTY_EXCHANGE_QUEUED_TIME, (Object)0L, Long.class);
            exchange.removeProperty(PROPERTY_EXCHANGE_QUEUED_TIME);
        }
        State state = exchange.getProperty(PROPERTY_EXCHANGE_STATE, (Object)State.SYNC, State.class);
        exchange.removeProperty(PROPERTY_EXCHANGE_STATE);
        boolean doneSync = state == State.SYNC || state == State.ASYNC_REJECTED;
        try {
            if (!this.isRunAllowed()) {
                throw new RejectedExecutionException("Run is not allowed");
            }
            return this.doProcess(exchange, callback, state, queuedStart, doneSync);
        }
        catch (InterruptedException e) {
            return ConcurrentRequestsThrottler.handleInterrupt(exchange, callback, e, doneSync);
        }
        catch (Exception t) {
            return ConcurrentRequestsThrottler.handleException(exchange, callback, t, doneSync);
        }
    }

    private boolean doProcess(Exchange exchange, AsyncCallback callback, State state, long queuedStart, boolean doneSync) throws Exception {
        String key = DEFAULT_KEY;
        if (this.correlationExpression != null) {
            key = this.correlationExpression.evaluate(exchange, String.class);
        }
        ThrottlingState throttlingState = this.states.computeIfAbsent(key, x$0 -> new ThrottlingState((String)x$0));
        throttlingState.calculateAndSetMaxConcurrentRequestsExpression(exchange);
        if (!throttlingState.tryAcquire(exchange)) {
            if (this.isRejectExecution()) {
                throw new ThrottlerRejectedExecutionException("Exceeded the max throttle rate of " + throttlingState.getThrottleRate());
            }
            if (this.isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", (Object)exchange.getExchangeId());
                }
                return this.processAsynchronously(exchange, callback, throttlingState);
            }
            ConcurrentRequestsThrottler.doThrottle(exchange, throttlingState, state, queuedStart);
        } else if (state == State.ASYNC) {
            if (LOG.isTraceEnabled()) {
                long queuedTime = Duration.ofNanos(System.nanoTime() - queuedStart).toMillis();
                LOG.trace("Queued for {}ms, No throttling applied (throttle cleared while queued), for exchangeId: {}", (Object)queuedTime, (Object)exchange.getExchangeId());
            }
        } else if (LOG.isTraceEnabled()) {
            LOG.trace("No throttling applied to exchangeId: {}", (Object)exchange.getExchangeId());
        }
        callback.done(doneSync);
        return doneSync;
    }

    private static void doThrottle(Exchange exchange, ThrottlingState throttlingState, State state, long queuedStart) throws InterruptedException {
        long start = 0L;
        long elapsed = 0L;
        if (LOG.isTraceEnabled()) {
            start = System.nanoTime();
        }
        throttlingState.acquire(exchange);
        if (LOG.isTraceEnabled()) {
            elapsed = System.nanoTime() - start;
        }
        if (state == State.ASYNC) {
            if (LOG.isTraceEnabled()) {
                long queuedTime = start - queuedStart;
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Queued for {}ms, Throttled for {}ms, exchangeId: {}", new Object[]{queuedTime, elapsed, exchange.getExchangeId()});
                }
            }
        } else if (LOG.isTraceEnabled()) {
            LOG.trace("Throttled for {}ms, exchangeId: {}", (Object)elapsed, (Object)exchange.getExchangeId());
        }
    }

    protected boolean processAsynchronously(Exchange exchange, AsyncCallback callback, ThrottlingState throttlingState) {
        try {
            if (LOG.isTraceEnabled()) {
                exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIME, (Object)System.nanoTime());
            }
            exchange.setProperty(PROPERTY_EXCHANGE_STATE, (Object)State.ASYNC);
            this.asyncExecutor.submit(() -> this.process(exchange, callback));
            return false;
        }
        catch (RejectedExecutionException e) {
            if (this.isCallerRunsWhenRejected()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("AsyncExecutor is full, rejected exchange will run in the current thread, exchangeId: {}", (Object)exchange.getExchangeId());
                }
                exchange.setProperty(PROPERTY_EXCHANGE_STATE, (Object)State.ASYNC_REJECTED);
                return this.process(exchange, callback);
            }
            throw e;
        }
    }

    @Override
    protected void doStart() throws Exception {
        if (this.isAsyncDelayed()) {
            ObjectHelper.notNull(this.asyncExecutor, "executorService", this);
        }
    }

    @Override
    protected void doShutdown() throws Exception {
        if (this.shutdownAsyncExecutor && this.asyncExecutor != null) {
            this.camelContext.getExecutorServiceManager().shutdownNow(this.asyncExecutor);
        }
        this.states.clear();
        super.doShutdown();
    }

    @Override
    public String getMode() {
        return "ConcurrentRequests";
    }

    @Override
    public int getCurrentMaximumRequests() {
        return this.states.values().stream().mapToInt(ThrottlingState::getThrottleRate).max().orElse(0);
    }

    @Override
    public String getTraceLabel() {
        return "throttle[" + String.valueOf(this.getMaximumRequestsExpression()) + "]";
    }

    public String toString() {
        return this.getId();
    }

    private static enum State {
        SYNC,
        ASYNC,
        ASYNC_REJECTED;

    }

    protected class ThrottlingState {
        private final String key;
        private final AtomicReference<ScheduledFuture<?>> cleanFuture = new AtomicReference();
        private volatile int throttleRate;
        private final WrappedSemaphore semaphore;

        ThrottlingState(String key) {
            this.key = key;
            this.semaphore = new WrappedSemaphore();
        }

        public int getThrottleRate() {
            return this.throttleRate;
        }

        public void clean() {
            ConcurrentRequestsThrottler.this.states.remove(this.key);
        }

        public boolean tryAcquire(Exchange exchange) {
            boolean acquired = this.semaphore.tryAcquire();
            if (acquired) {
                this.addSynchronization(exchange);
            }
            return acquired;
        }

        public void acquire(Exchange exchange) throws InterruptedException {
            this.semaphore.acquire();
            this.addSynchronization(exchange);
        }

        private void addSynchronization(Exchange exchange) {
            exchange.getExchangeExtension().addOnCompletion(new Synchronization(){

                @Override
                public void onComplete(Exchange exchange) {
                    ThrottlingState.this.release(exchange);
                }

                @Override
                public void onFailure(Exchange exchange) {
                    ThrottlingState.this.release(exchange);
                }
            });
        }

        public void release(Exchange exchange) {
            this.semaphore.release();
            try {
                ScheduledFuture<?> next = ConcurrentRequestsThrottler.this.asyncExecutor.schedule(this::clean, 10000L, TimeUnit.MILLISECONDS);
                ScheduledFuture<?> prev = this.cleanFuture.getAndSet(next);
                if (prev != null) {
                    prev.cancel(false);
                }
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Permit released, for exchangeId: {}", (Object)exchange.getExchangeId());
                }
            }
            catch (RejectedExecutionException e) {
                LOG.debug("Throttle cleaning rejected", (Throwable)e);
            }
        }

        public synchronized void calculateAndSetMaxConcurrentRequestsExpression(Exchange exchange) throws Exception {
            Integer newThrottle = ConcurrentRequestsThrottler.this.getMaximumRequestsExpression().evaluate(exchange, Integer.class);
            if (newThrottle != null && newThrottle < 0) {
                throw new IllegalStateException("The maximumConcurrentRequests must be a positive number, was: " + newThrottle);
            }
            if (newThrottle == null && this.throttleRate == 0) {
                throw new RuntimeExchangeException("The maxConcurrentRequestsExpression was evaluated as null: " + String.valueOf(ConcurrentRequestsThrottler.this.getMaximumRequestsExpression()), exchange);
            }
            if (newThrottle != null && newThrottle != this.throttleRate) {
                if (this.throttleRate > newThrottle) {
                    int delta = this.throttleRate - newThrottle;
                    this.semaphore.reducePermits(delta);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", new Object[]{this.throttleRate, newThrottle, exchange.getExchangeId()});
                    }
                } else if (newThrottle > this.throttleRate) {
                    int delta = newThrottle - this.throttleRate;
                    this.semaphore.increasePermits(delta);
                    if (this.throttleRate == 0) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", (Object)newThrottle, (Object)exchange.getExchangeId());
                        }
                    } else if (LOG.isDebugEnabled()) {
                        LOG.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", new Object[]{this.throttleRate, newThrottle, exchange.getExchangeId()});
                    }
                }
                this.throttleRate = newThrottle;
            }
        }
    }

    private static class WrappedSemaphore
    extends Semaphore {
        public WrappedSemaphore() {
            super(0, true);
        }

        @Override
        public boolean tryAcquire() {
            try {
                return super.tryAcquire(0L, TimeUnit.NANOSECONDS);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return false;
            }
        }

        @Override
        public void reducePermits(int n) {
            super.reducePermits(n);
        }

        public void increasePermits(int n) {
            super.release(n);
        }
    }
}

