/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.impl.engine;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StaticService;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.processor.DefaultExchangeFormatter;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultAsyncProcessorAwaitManager
extends ServiceSupport
implements AsyncProcessorAwaitManager,
StaticService {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultAsyncProcessorAwaitManager.class);
    private final AsyncProcessorAwaitManager.Statistics statistics = new UtilizationStatistics();
    private final AtomicLong blockedCounter = new AtomicLong();
    private final AtomicLong interruptedCounter = new AtomicLong();
    private final AtomicLong totalDuration = new AtomicLong();
    private final AtomicLong minDuration = new AtomicLong();
    private final AtomicLong maxDuration = new AtomicLong();
    private final AtomicLong meanDuration = new AtomicLong();
    private final Map<Exchange, AsyncProcessorAwaitManager.AwaitThread> inflight = new ConcurrentHashMap<Exchange, AsyncProcessorAwaitManager.AwaitThread>();
    private final ExchangeFormatter exchangeFormatter;
    private boolean interruptThreadsWhileStopping = true;

    public DefaultAsyncProcessorAwaitManager() {
        DefaultExchangeFormatter formatter = new DefaultExchangeFormatter();
        formatter.setShowExchangeId(true);
        formatter.setMultiline(true);
        formatter.setShowHeaders(true);
        formatter.setStyle(DefaultExchangeFormatter.OutputStyle.Fixed);
        this.exchangeFormatter = formatter;
    }

    @Override
    public void process(AsyncProcessor processor, Exchange exchange) {
        CountDownLatch latch = new CountDownLatch(1);
        processor.process(exchange, (boolean doneSync) -> this.countDown(exchange, latch));
        if (latch.getCount() > 0L) {
            this.await(exchange, latch);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void await(Exchange exchange, CountDownLatch latch) {
        block19: {
            long mean;
            ReactiveExecutor reactiveExecutor = exchange.getContext().getCamelContextExtension().getReactiveExecutor();
            do {
                if (latch.getCount() > 0L) continue;
                return;
            } while (reactiveExecutor.executeFromQueue());
            if (LOG.isTraceEnabled()) {
                LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}", (Object)exchange.getExchangeId(), (Object)exchange);
            }
            try {
                if (this.statistics.isStatisticsEnabled()) {
                    this.blockedCounter.incrementAndGet();
                }
                this.inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch));
                latch.await();
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}", (Object)exchange.getExchangeId(), (Object)exchange);
                }
                AsyncProcessorAwaitManager.AwaitThread thread = this.inflight.remove(exchange);
                if (!this.statistics.isStatisticsEnabled() || thread == null) break block19;
                long time = thread.getWaitDuration();
                long total = this.totalDuration.get() + time;
                this.totalDuration.set(total);
                if (time < this.minDuration.get()) {
                    this.minDuration.set(time);
                } else if (time > this.maxDuration.get()) {
                    this.maxDuration.set(time);
                }
                long count = this.blockedCounter.get();
                mean = count > 0L ? total / count : 0L;
            }
            catch (InterruptedException e) {
                long mean2;
                try {
                    Thread.currentThread().interrupt();
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}", (Object)exchange.getExchangeId(), (Object)exchange);
                    }
                    exchange.setException(e);
                    AsyncProcessorAwaitManager.AwaitThread thread = this.inflight.remove(exchange);
                    if (!this.statistics.isStatisticsEnabled() || thread == null) break block19;
                    long time = thread.getWaitDuration();
                    long total = this.totalDuration.get() + time;
                    this.totalDuration.set(total);
                    if (time < this.minDuration.get()) {
                        this.minDuration.set(time);
                    } else if (time > this.maxDuration.get()) {
                        this.maxDuration.set(time);
                    }
                    long count = this.blockedCounter.get();
                    mean2 = count > 0L ? total / count : 0L;
                }
                catch (Throwable throwable) {
                    AsyncProcessorAwaitManager.AwaitThread thread = this.inflight.remove(exchange);
                    if (this.statistics.isStatisticsEnabled() && thread != null) {
                        long time = thread.getWaitDuration();
                        long total = this.totalDuration.get() + time;
                        this.totalDuration.set(total);
                        if (time < this.minDuration.get()) {
                            this.minDuration.set(time);
                        } else if (time > this.maxDuration.get()) {
                            this.maxDuration.set(time);
                        }
                        long count = this.blockedCounter.get();
                        long mean3 = count > 0L ? total / count : 0L;
                        this.meanDuration.set(mean3);
                    }
                    throw throwable;
                }
                this.meanDuration.set(mean2);
            }
            this.meanDuration.set(mean);
        }
    }

    public void countDown(Exchange exchange, CountDownLatch latch) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Asynchronous callback received for exchangeId: {}", (Object)exchange.getExchangeId());
        }
        latch.countDown();
    }

    @Override
    public int size() {
        return this.inflight.size();
    }

    @Override
    public Collection<AsyncProcessorAwaitManager.AwaitThread> browse() {
        return Collections.unmodifiableCollection(this.inflight.values());
    }

    @Override
    public void interrupt(String exchangeId) {
        Exchange found = null;
        for (AsyncProcessorAwaitManager.AwaitThread entry : this.browse()) {
            Exchange exchange = entry.getExchange();
            if (!exchangeId.equals(exchange.getExchangeId())) continue;
            found = exchange;
            break;
        }
        if (found != null) {
            this.interrupt(found);
        }
    }

    @Override
    public void interrupt(Exchange exchange) {
        AwaitThreadEntry entry = (AwaitThreadEntry)this.inflight.get(exchange);
        if (entry != null) {
            try {
                StringBuilder sb = new StringBuilder(512);
                sb.append("Interrupted while waiting for asynchronous callback, will release the following blocked thread which was waiting for exchange to finish processing with exchangeId: ");
                sb.append(exchange.getExchangeId());
                sb.append("\n");
                sb.append(DefaultAsyncProcessorAwaitManager.dumpBlockedThread(entry));
                String routeStackTrace = MessageHelper.dumpMessageHistoryStacktrace(exchange, this.exchangeFormatter, false);
                sb.append(routeStackTrace);
                LOG.warn(sb.toString());
            }
            catch (Exception e) {
                throw RuntimeCamelException.wrapRuntimeCamelException(e);
            }
            finally {
                if (this.statistics.isStatisticsEnabled()) {
                    this.interruptedCounter.incrementAndGet();
                }
                exchange.setException(new RejectedExecutionException("Interrupted while waiting for asynchronous callback for exchangeId: " + exchange.getExchangeId()));
                exchange.getExchangeExtension().setInterrupted(true);
                entry.getLatch().countDown();
            }
        }
    }

    @Override
    public boolean isInterruptThreadsWhileStopping() {
        return this.interruptThreadsWhileStopping;
    }

    @Override
    public void setInterruptThreadsWhileStopping(boolean interruptThreadsWhileStopping) {
        this.interruptThreadsWhileStopping = interruptThreadsWhileStopping;
    }

    @Override
    public AsyncProcessorAwaitManager.Statistics getStatistics() {
        return this.statistics;
    }

    @Override
    protected void doStop() throws Exception {
        Collection<AsyncProcessorAwaitManager.AwaitThread> threads = this.browse();
        int count = threads.size();
        if (count > 0) {
            LOG.warn("Shutting down while there are still {} inflight threads currently blocked.", (Object)count);
            StringBuilder sb = new StringBuilder(1024);
            for (AsyncProcessorAwaitManager.AwaitThread entry : threads) {
                sb.append(DefaultAsyncProcessorAwaitManager.dumpBlockedThread(entry));
            }
            if (this.isInterruptThreadsWhileStopping()) {
                LOG.warn("The following threads are blocked and will be interrupted so the threads are released:\n{}", (Object)sb);
                for (AsyncProcessorAwaitManager.AwaitThread entry : threads) {
                    try {
                        this.interrupt(entry.getExchange());
                    }
                    catch (Exception e) {
                        LOG.warn("Error while interrupting thread: {}. This exception is ignored.", (Object)entry.getBlockedThread().getName(), (Object)e);
                    }
                }
            } else {
                LOG.warn("The following threads are blocked, and may reside in the JVM:\n{}", (Object)sb);
            }
        } else {
            LOG.debug("Shutting down with no inflight threads.");
        }
        this.inflight.clear();
    }

    private static String dumpBlockedThread(AsyncProcessorAwaitManager.AwaitThread entry) {
        StringBuilder sb = new StringBuilder(512);
        sb.append("\n");
        sb.append("Blocked Thread\n");
        sb.append("---------------------------------------------------------------------------------------------------------------------------------------\n");
        sb.append(DefaultAsyncProcessorAwaitManager.style("Id:")).append(entry.getBlockedThread().getId()).append("\n");
        sb.append(DefaultAsyncProcessorAwaitManager.style("Name:")).append(entry.getBlockedThread().getName()).append("\n");
        sb.append(DefaultAsyncProcessorAwaitManager.style("RouteId:")).append(DefaultAsyncProcessorAwaitManager.safeNull(entry.getRouteId())).append("\n");
        sb.append(DefaultAsyncProcessorAwaitManager.style("NodeId:")).append(DefaultAsyncProcessorAwaitManager.safeNull(entry.getNodeId())).append("\n");
        sb.append(DefaultAsyncProcessorAwaitManager.style("Duration:")).append(entry.getWaitDuration()).append(" msec.\n");
        return sb.toString();
    }

    private static String style(String label) {
        return String.format("\t%-20s", label);
    }

    private static String safeNull(Object value) {
        return value != null ? value.toString() : "";
    }

    private final class UtilizationStatistics
    implements AsyncProcessorAwaitManager.Statistics {
        private boolean statisticsEnabled;

        private UtilizationStatistics() {
        }

        @Override
        public long getThreadsBlocked() {
            return DefaultAsyncProcessorAwaitManager.this.blockedCounter.get();
        }

        @Override
        public long getThreadsInterrupted() {
            return DefaultAsyncProcessorAwaitManager.this.interruptedCounter.get();
        }

        @Override
        public long getTotalDuration() {
            return DefaultAsyncProcessorAwaitManager.this.totalDuration.get();
        }

        @Override
        public long getMinDuration() {
            return DefaultAsyncProcessorAwaitManager.this.minDuration.get();
        }

        @Override
        public long getMaxDuration() {
            return DefaultAsyncProcessorAwaitManager.this.maxDuration.get();
        }

        @Override
        public long getMeanDuration() {
            return DefaultAsyncProcessorAwaitManager.this.meanDuration.get();
        }

        @Override
        public void reset() {
            DefaultAsyncProcessorAwaitManager.this.blockedCounter.set(0L);
            DefaultAsyncProcessorAwaitManager.this.interruptedCounter.set(0L);
            DefaultAsyncProcessorAwaitManager.this.totalDuration.set(0L);
            DefaultAsyncProcessorAwaitManager.this.minDuration.set(0L);
            DefaultAsyncProcessorAwaitManager.this.maxDuration.set(0L);
            DefaultAsyncProcessorAwaitManager.this.meanDuration.set(0L);
        }

        @Override
        public boolean isStatisticsEnabled() {
            return this.statisticsEnabled;
        }

        @Override
        public void setStatisticsEnabled(boolean statisticsEnabled) {
            this.statisticsEnabled = statisticsEnabled;
        }

        public String toString() {
            return String.format("AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]", this.getThreadsBlocked(), this.getThreadsInterrupted(), this.getTotalDuration(), this.getMinDuration(), this.getMaxDuration(), this.getMeanDuration());
        }
    }

    private static final class AwaitThreadEntry
    implements AsyncProcessorAwaitManager.AwaitThread {
        private final Thread thread;
        private final Exchange exchange;
        private final CountDownLatch latch;
        private final StopWatch watch = new StopWatch();

        private AwaitThreadEntry(Thread thread, Exchange exchange, CountDownLatch latch) {
            this.thread = thread;
            this.exchange = exchange;
            this.latch = latch;
        }

        @Override
        public Thread getBlockedThread() {
            return this.thread;
        }

        @Override
        public Exchange getExchange() {
            return this.exchange;
        }

        @Override
        public long getWaitDuration() {
            return this.watch.taken();
        }

        @Override
        public String getRouteId() {
            return ExchangeHelper.getAtRouteId(this.exchange);
        }

        @Override
        public String getNodeId() {
            return this.exchange.getExchangeExtension().getHistoryNodeId();
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        public String toString() {
            return "AwaitThreadEntry[name=" + this.thread.getName() + ", exchangeId=" + this.exchange.getExchangeId() + "]";
        }
    }
}

