package org.apache.camel.processor;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.ErrorHandlerAware;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.InternalProcessorFactory;
import org.apache.camel.spi.ProcessorExchangeFactory;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RestConfiguration;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.LRUCacheFactory;
import org.apache.camel.support.PatternHelper;
import org.apache.camel.support.PluginHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.concurrent.AsyncCompletionService;
import org.apache.camel.util.concurrent.Rejectable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:org/apache/camel/processor/MulticastProcessor.class */
public class MulticastProcessor extends AsyncProcessorSupport implements Navigate<Processor>, Traceable, IdAware, RouteIdAware, ErrorHandlerAware {
    private static final Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class);
    protected final Processor onPrepare;
    protected final ProcessorExchangeFactory processorExchangeFactory;
    private final AsyncProcessorAwaitManager awaitManager;
    private final CamelContext camelContext;
    private final InternalProcessorFactory internalProcessorFactory;
    private final Route route;
    private final ReactiveExecutor reactiveExecutor;
    private Processor errorHandler;
    private String id;
    private String routeId;
    private final Collection<Processor> processors;
    private final AggregationStrategy aggregationStrategy;
    private final boolean parallelProcessing;
    private boolean synchronous;
    private final boolean streaming;
    private final boolean parallelAggregate;
    private final boolean stopOnException;
    private final ExecutorService executorService;
    private final boolean shutdownExecutorService;
    private final Scheduler scheduler;
    private ExecutorService aggregateExecutorService;
    private boolean shutdownAggregateExecutorService;
    private final long timeout;
    private final int cacheSize;
    private final ConcurrentMap<Processor, Processor> errorHandlers;
    private final boolean shareUnitOfWork;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/camel/processor/MulticastProcessor$DefaultProcessorExchangePair.class */
    public static final class DefaultProcessorExchangePair implements ProcessorExchangePair {
        private final int index;
        private final Processor processor;
        private final Processor prepared;
        private final Exchange exchange;

        private DefaultProcessorExchangePair(int i, Processor processor, Processor processor2, Exchange exchange) {
            this.index = i;
            this.processor = processor;
            this.prepared = processor2;
            this.exchange = exchange;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public int getIndex() {
            return this.index;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public Exchange getExchange() {
            return this.exchange;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public Producer getProducer() {
            if (this.processor instanceof Producer) {
                return (Producer) this.processor;
            }
            return null;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public Processor getProcessor() {
            return this.prepared;
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public void begin() {
        }

        @Override // org.apache.camel.processor.ProcessorExchangePair
        public void done() {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/camel/processor/MulticastProcessor$MulticastReactiveTask.class */
    public class MulticastReactiveTask extends MulticastTask {
        public MulticastReactiveTask(Exchange exchange, Iterable<ProcessorExchangePair> iterable, AsyncCallback asyncCallback, int i) {
            super(exchange, iterable, asyncCallback, i);
        }

        @Override // org.apache.camel.processor.MulticastProcessor.MulticastTask, java.lang.Runnable
        public void run() {
            super.run();
            try {
                if (this.done.get()) {
                    return;
                }
                ProcessorExchangePair nextProcessorExchangePair = getNextProcessorExchangePair();
                if (nextProcessorExchangePair == null) {
                    doDone(this.result.get(), true);
                    return;
                }
                boolean hasNext = this.iterator.hasNext();
                Exchange exchange = nextProcessorExchangePair.getExchange();
                int andIncrement = this.nbExchangeSent.getAndIncrement();
                MulticastProcessor.this.updateNewExchange(exchange, andIncrement, this.pairs, hasNext);
                if (!hasNext) {
                    this.allSent.set(true);
                }
                this.completion.submit(consumer -> {
                    StopWatch beforeSend = MulticastProcessor.this.beforeSend(nextProcessorExchangePair);
                    AsyncProcessorConverterHelper.convert(nextProcessorExchangePair.getProcessor()).process(exchange, z -> {
                        MulticastProcessor.this.afterSend(nextProcessorExchangePair, beforeSend);
                        String str = null;
                        if (MulticastProcessor.LOG.isDebugEnabled()) {
                            str = "Multicast processing failed for number " + andIncrement;
                        }
                        boolean continueProcessing = PipelineHelper.continueProcessing(exchange, str, MulticastProcessor.LOG);
                        if (MulticastProcessor.this.stopOnException && !continueProcessing) {
                            if (exchange.getException() != null) {
                                exchange.setException(new CamelExchangeException("Multicast processing failed for number " + andIncrement, exchange, exchange.getException()));
                            } else {
                                this.result.set(exchange);
                            }
                            doDone(exchange, true);
                            return;
                        }
                        consumer.accept(exchange);
                        aggregate();
                        if (!hasNext || MulticastProcessor.this.isParallelProcessing()) {
                            return;
                        }
                        MulticastProcessor.this.schedule(this);
                    });
                });
                if (hasNext && MulticastProcessor.this.isParallelProcessing()) {
                    MulticastProcessor.this.schedule(this);
                }
            } catch (Exception e) {
                this.original.setException(e);
                doDone(null, false);
            }
        }

        private ProcessorExchangePair getNextProcessorExchangePair() {
            ProcessorExchangePair processorExchangePair;
            ProcessorExchangePair processorExchangePair2 = null;
            while (true) {
                processorExchangePair = processorExchangePair2;
                if (processorExchangePair != null || !this.iterator.hasNext()) {
                    break;
                }
                processorExchangePair2 = this.iterator.next();
            }
            return processorExchangePair;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/camel/processor/MulticastProcessor$MulticastTask.class */
    public abstract class MulticastTask implements Runnable, Rejectable {
        final Exchange original;
        final Iterable<ProcessorExchangePair> pairs;
        final AsyncCallback callback;
        final Iterator<ProcessorExchangePair> iterator;
        final AsyncCompletionService<Exchange> completion;
        final Map<String, String> mdc;
        final ScheduledFuture<?> timeoutTask;
        final ReentrantLock lock = new ReentrantLock();
        final AtomicReference<Exchange> result = new AtomicReference<>();
        final AtomicInteger nbExchangeSent = new AtomicInteger();
        final AtomicInteger nbAggregated = new AtomicInteger();
        final AtomicBoolean allSent = new AtomicBoolean();
        final AtomicBoolean done = new AtomicBoolean();

        MulticastTask(Exchange exchange, Iterable<ProcessorExchangePair> iterable, AsyncCallback asyncCallback, int i) {
            this.original = exchange;
            this.pairs = iterable;
            this.callback = asyncCallback;
            this.iterator = iterable.iterator();
            if (MulticastProcessor.this.timeout > 0) {
                this.timeoutTask = MulticastProcessor.this.schedule(MulticastProcessor.this.aggregateExecutorService, this::timeout, MulticastProcessor.this.timeout, TimeUnit.MILLISECONDS);
            } else {
                this.timeoutTask = null;
            }
            if (MulticastProcessor.this.isParallelProcessing() && exchange.getContext().isUseMDCLogging().booleanValue()) {
                this.mdc = MDC.getCopyOfContextMap();
            } else {
                this.mdc = null;
            }
            if (i > 0) {
                this.completion = new AsyncCompletionService<>(MulticastProcessor.this.scheduler, !MulticastProcessor.this.isStreaming(), this.lock, i);
            } else {
                this.completion = new AsyncCompletionService<>(MulticastProcessor.this.scheduler, !MulticastProcessor.this.isStreaming(), this.lock);
            }
        }

        public String toString() {
            return "MulticastTask";
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.mdc != null) {
                this.mdc.forEach(MDC::put);
            }
        }

        protected void aggregate() {
            Exchange poll;
            ReentrantLock reentrantLock = this.lock;
            if (reentrantLock.tryLock()) {
                while (!this.done.get() && (poll = this.completion.poll()) != null) {
                    try {
                        MulticastProcessor.this.doAggregate(this.result, poll, this.original);
                        if (this.nbAggregated.incrementAndGet() >= this.nbExchangeSent.get() && this.allSent.get()) {
                            doDone(this.result.get(), true);
                        }
                    } catch (Exception e) {
                        this.original.setException(e);
                        doDone(null, false);
                        return;
                    } finally {
                        reentrantLock.unlock();
                    }
                }
            }
        }

        protected void timeout() {
            ReentrantLock reentrantLock = this.lock;
            if (reentrantLock.tryLock()) {
                while (this.nbAggregated.get() < this.nbExchangeSent.get()) {
                    try {
                        try {
                            Exchange pollUnordered = this.completion.pollUnordered();
                            int intValue = pollUnordered != null ? MulticastProcessor.this.getExchangeIndex(pollUnordered).intValue() : this.nbExchangeSent.get();
                            while (this.nbAggregated.get() < intValue) {
                                int andIncrement = this.nbAggregated.getAndIncrement();
                                AggregationStrategy aggregationStrategy = MulticastProcessor.this.getAggregationStrategy(null);
                                if (aggregationStrategy != null) {
                                    aggregationStrategy.timeout(this.result.get() != null ? this.result.get() : this.original, andIncrement, this.nbExchangeSent.get(), MulticastProcessor.this.timeout);
                                }
                            }
                            if (pollUnordered != null) {
                                MulticastProcessor.this.doAggregate(this.result, pollUnordered, this.original);
                                this.nbAggregated.incrementAndGet();
                            }
                        } catch (Exception e) {
                            this.original.setException(e);
                            doTimeoutDone(null, false);
                            reentrantLock.unlock();
                            return;
                        }
                    } catch (Throwable th) {
                        reentrantLock.unlock();
                        throw th;
                    }
                }
                doTimeoutDone(this.result.get(), true);
                reentrantLock.unlock();
            }
        }

        protected void doTimeoutDone(Exchange exchange, boolean z) {
            if (this.done.compareAndSet(false, true)) {
                MulticastProcessor.this.doDone(this.original, exchange, this.pairs, this.callback, false, z);
            }
        }

        protected void doDone(Exchange exchange, boolean z) {
            if (this.done.compareAndSet(false, true)) {
                if (this.timeoutTask != null) {
                    try {
                        this.timeoutTask.cancel(true);
                    } catch (Exception e) {
                        MulticastProcessor.LOG.debug("Cancel timeout task caused an exception. This exception is ignored.", e);
                    }
                }
                MulticastProcessor.this.doDone(this.original, exchange, this.pairs, this.callback, false, z);
            }
        }

        @Override // org.apache.camel.util.concurrent.Rejectable
        public void reject() {
            this.original.setException(new RejectedExecutionException("Task rejected executing from ExecutorService"));
            doDone(null, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/camel/processor/MulticastProcessor$MulticastTransactedTask.class */
    public class MulticastTransactedTask extends MulticastTask {
        public MulticastTransactedTask(Exchange exchange, Iterable<ProcessorExchangePair> iterable, AsyncCallback asyncCallback, int i) {
            super(exchange, iterable, asyncCallback, i);
        }

        @Override // org.apache.camel.processor.MulticastProcessor.MulticastTask, java.lang.Runnable
        public void run() {
            super.run();
            boolean z = true;
            while (z) {
                try {
                    z = doRun();
                } catch (Exception e) {
                    this.original.setException(e);
                    doDone(null, false);
                    return;
                }
            }
        }

        boolean doRun() throws Exception {
            if (this.done.get()) {
                return false;
            }
            if (!this.iterator.hasNext()) {
                doDone(this.result.get(), true);
                return false;
            }
            ProcessorExchangePair next = this.iterator.next();
            boolean hasNext = this.iterator.hasNext();
            Exchange exchange = next.getExchange();
            int andIncrement = this.nbExchangeSent.getAndIncrement();
            MulticastProcessor.this.updateNewExchange(exchange, andIncrement, this.pairs, hasNext);
            if (!hasNext) {
                this.allSent.set(true);
            }
            StopWatch beforeSend = MulticastProcessor.this.beforeSend(next);
            try {
                try {
                    next.getProcessor().process(exchange);
                    MulticastProcessor.this.afterSend(next, beforeSend);
                } catch (Exception e) {
                    exchange.setException(e);
                    MulticastProcessor.this.afterSend(next, beforeSend);
                }
                String str = null;
                if (MulticastProcessor.LOG.isDebugEnabled()) {
                    str = "Multicast processing failed for number " + andIncrement;
                }
                boolean continueProcessing = PipelineHelper.continueProcessing(exchange, str, MulticastProcessor.LOG);
                if (MulticastProcessor.this.stopOnException && !continueProcessing) {
                    if (exchange.getException() != null) {
                        exchange.setException(new CamelExchangeException("Multicast processing failed for number " + andIncrement, exchange, exchange.getException()));
                    } else {
                        this.result.set(exchange);
                    }
                    doDone(exchange, true);
                    return false;
                }
                this.completion.submit(consumer -> {
                    consumer.accept(exchange);
                    aggregate();
                });
                if (hasNext && MulticastProcessor.this.isParallelProcessing()) {
                    MulticastProcessor.this.schedule(this);
                }
                boolean z = hasNext && !MulticastProcessor.this.isParallelProcessing();
                MulticastProcessor.LOG.trace("Run next: {}", Boolean.valueOf(z));
                return z;
            } catch (Throwable th) {
                MulticastProcessor.this.afterSend(next, beforeSend);
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/camel/processor/MulticastProcessor$Scheduler.class */
    private final class Scheduler implements Executor {
        private Scheduler() {
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            MulticastProcessor.this.schedule(runnable);
        }
    }

    public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> collection) {
        this(camelContext, route, collection, null);
    }

    public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> collection, AggregationStrategy aggregationStrategy) {
        this(camelContext, route, collection, aggregationStrategy, false, null, false, false, false, 0L, null, false, false, CamelContextHelper.getMaximumCachePoolSize(camelContext));
    }

    public MulticastProcessor(CamelContext camelContext, Route route, Collection<Processor> collection, AggregationStrategy aggregationStrategy, boolean z, ExecutorService executorService, boolean z2, boolean z3, boolean z4, long j, Processor processor, boolean z5, boolean z6, int i) {
        this.scheduler = new Scheduler();
        ObjectHelper.notNull(camelContext, "camelContext");
        this.camelContext = camelContext;
        this.internalProcessorFactory = PluginHelper.getInternalProcessorFactory(camelContext);
        this.awaitManager = PluginHelper.getAsyncProcessorAwaitManager(camelContext);
        this.route = route;
        this.reactiveExecutor = camelContext.getCamelContextExtension().getReactiveExecutor();
        this.processors = collection;
        this.aggregationStrategy = aggregationStrategy;
        this.executorService = executorService;
        this.shutdownExecutorService = z2;
        this.streaming = z3;
        this.stopOnException = z4;
        this.parallelProcessing = z || executorService != null;
        this.timeout = j;
        this.onPrepare = processor;
        this.shareUnitOfWork = z5;
        this.parallelAggregate = z6;
        this.processorExchangeFactory = camelContext.getCamelContextExtension().getProcessorExchangeFactory().newProcessorExchangeFactory(this);
        this.cacheSize = i;
        if (i >= 0) {
            this.errorHandlers = (ConcurrentMap) LRUCacheFactory.newLRUCache(i);
        } else {
            this.errorHandlers = null;
        }
    }

    public String toString() {
        return this.id;
    }

    @Override // org.apache.camel.spi.HasId
    public String getId() {
        return this.id;
    }

    @Override // org.apache.camel.spi.IdAware
    public void setId(String str) {
        this.id = str;
    }

    @Override // org.apache.camel.spi.RouteIdAware
    public String getRouteId() {
        return this.routeId;
    }

    @Override // org.apache.camel.spi.RouteIdAware
    public void setRouteId(String str) {
        this.routeId = str;
    }

    @Override // org.apache.camel.spi.ErrorHandlerAware
    public void setErrorHandler(Processor processor) {
        this.errorHandler = processor;
    }

    @Override // org.apache.camel.spi.ErrorHandlerAware
    public Processor getErrorHandler() {
        return this.errorHandler;
    }

    @Override // org.apache.camel.Traceable
    public String getTraceLabel() {
        return "multicast";
    }

    public CamelContext getCamelContext() {
        return this.camelContext;
    }

    public boolean isSynchronous() {
        return this.synchronous;
    }

    public void setSynchronous(boolean z) {
        this.synchronous = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doBuild() throws Exception {
        if (this.processorExchangeFactory != null) {
            this.processorExchangeFactory.setId(this.id);
            this.processorExchangeFactory.setRouteId(this.routeId);
        }
        ServiceHelper.buildService(this.processorExchangeFactory);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doInit() throws Exception {
        if (this.route != null) {
            DefaultExchange defaultExchange = new DefaultExchange(getCamelContext());
            Iterator<Processor> it = getProcessors().iterator();
            while (it.hasNext()) {
                wrapInErrorHandler(this.route, defaultExchange, it.next());
            }
        }
        ServiceHelper.initService(this.processorExchangeFactory);
    }

    @Override // org.apache.camel.AsyncProcessor
    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        try {
            if (!this.synchronous) {
                return doProcess(exchange, asyncCallback);
            }
            try {
                this.awaitManager.process(new AsyncProcessorSupport() { // from class: org.apache.camel.processor.MulticastProcessor.1
                    @Override // org.apache.camel.AsyncProcessor
                    public boolean process(Exchange exchange2, AsyncCallback asyncCallback2) {
                        return MulticastProcessor.this.doProcess(exchange2, asyncCallback2);
                    }
                }, exchange);
                asyncCallback.done(true);
                return true;
            } catch (Exception e) {
                exchange.setException(e);
                asyncCallback.done(true);
                return true;
            }
        } catch (Throwable th) {
            asyncCallback.done(true);
            throw th;
        }
    }

    protected boolean doProcess(Exchange exchange, AsyncCallback asyncCallback) {
        int i = 0;
        try {
            Iterable<ProcessorExchangePair> createProcessorExchangePairs = createProcessorExchangePairs(exchange);
            if (createProcessorExchangePairs instanceof Collection) {
                createProcessorExchangePairs = ((Collection) createProcessorExchangePairs).stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).toList();
                i = ((Collection) createProcessorExchangePairs).size();
            }
            MulticastTask multicastReactiveTask = (isParallelProcessing() || !exchange.isTransacted()) ? new MulticastReactiveTask(exchange, createProcessorExchangePairs, asyncCallback, i) : new MulticastTransactedTask(exchange, createProcessorExchangePairs, asyncCallback, i);
            if (isParallelProcessing()) {
                try {
                    this.executorService.submit(() -> {
                        this.reactiveExecutor.scheduleSync(multicastReactiveTask);
                    });
                    return false;
                } catch (RejectedExecutionException e) {
                    multicastReactiveTask.reject();
                    return false;
                }
            }
            if (exchange.isTransacted()) {
                this.reactiveExecutor.scheduleQueue(multicastReactiveTask);
                return false;
            }
            this.reactiveExecutor.scheduleMain(multicastReactiveTask);
            return false;
        } catch (Exception e2) {
            exchange.setException(e2);
            doDone(exchange, null, null, asyncCallback, true, false);
            return true;
        }
    }

    protected void schedule(Runnable runnable) {
        if (!isParallelProcessing()) {
            this.reactiveExecutor.schedule(runnable);
            return;
        }
        Runnable prepareParallelTask = prepareParallelTask(runnable);
        try {
            this.executorService.submit(() -> {
                this.reactiveExecutor.scheduleSync(prepareParallelTask);
            });
        } catch (RejectedExecutionException e) {
            if (runnable instanceof Rejectable) {
                ((Rejectable) runnable).reject();
            }
        }
    }

    private Runnable prepareParallelTask(Runnable runnable) {
        Runnable runnable2 = runnable;
        if (this.camelContext.isUseMDCLogging().booleanValue()) {
            String mDCLoggingKeysPattern = this.camelContext.getMDCLoggingKeysPattern();
            Map copyOfContextMap = MDC.getCopyOfContextMap();
            if (copyOfContextMap != null && !copyOfContextMap.isEmpty()) {
                runnable2 = () -> {
                    if (mDCLoggingKeysPattern != null) {
                        try {
                            if (!RestConfiguration.CORS_ACCESS_CONTROL_ALLOW_ORIGIN.equals(mDCLoggingKeysPattern)) {
                                String[] split = mDCLoggingKeysPattern.split(",");
                                copyOfContextMap.forEach((str, str2) -> {
                                    if (PatternHelper.matchPatterns(str, split)) {
                                        MDC.put(str, str2);
                                    }
                                });
                                runnable.run();
                            }
                        } catch (Throwable th) {
                            runnable.run();
                            throw th;
                        }
                    }
                    copyOfContextMap.forEach(MDC::put);
                    runnable.run();
                };
            }
        }
        return runnable2;
    }

    protected ScheduledFuture<?> schedule(Executor executor, Runnable runnable, long j, TimeUnit timeUnit) {
        if (executor instanceof ScheduledExecutorService) {
            return ((ScheduledExecutorService) executor).schedule(runnable, j, timeUnit);
        }
        executor.execute(() -> {
            try {
                Thread.sleep(j);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            runnable.run();
        });
        return null;
    }

    protected StopWatch beforeSend(ProcessorExchangePair processorExchangePair) {
        Exchange exchange = processorExchangePair.getExchange();
        Producer producer = processorExchangePair.getProducer();
        StopWatch stopWatch = producer != null ? EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()) ? new StopWatch() : null : null;
        processorExchangePair.begin();
        return stopWatch;
    }

    protected void afterSend(ProcessorExchangePair processorExchangePair, StopWatch stopWatch) {
        processorExchangePair.done();
        Producer producer = processorExchangePair.getProducer();
        if (producer == null || stopWatch == null) {
            return;
        }
        long taken = stopWatch.taken();
        Exchange exchange = processorExchangePair.getExchange();
        EventHelper.notifyExchangeSent(exchange.getContext(), exchange, producer.getEndpoint(), taken);
    }

    protected void doDone(Exchange exchange, Exchange exchange2, Iterable<ProcessorExchangePair> iterable, AsyncCallback asyncCallback, boolean z, boolean z2) {
        AggregationStrategy aggregationStrategy = getAggregationStrategy(exchange2);
        if (aggregationStrategy != null) {
            aggregationStrategy.onCompletion(exchange2, exchange);
        }
        removeAggregationStrategyFromExchange(exchange);
        boolean z3 = false;
        boolean z4 = false;
        boolean z5 = z2 || (exchange2 != null && (exchange2.getException() != null || exchange2.getExchangeExtension().isRedeliveryExhausted()));
        if (exchange.getException() != null || (exchange2 != null && exchange2.getException() != null)) {
            z3 = isStopOnException();
            z4 = true;
        }
        if (exchange2 != null) {
            if (z3) {
                exchange.setException(exchange2.getException());
            } else {
                Object removeProperty = exchange.removeProperty(ExchangePropertyKey.CORRELATION_ID);
                ExchangeHelper.copyResults(exchange, exchange2);
                if (removeProperty != null) {
                    exchange.setProperty(ExchangePropertyKey.CORRELATION_ID, removeProperty);
                }
            }
        }
        if (this.processorExchangeFactory != null && iterable != null) {
            try {
                Iterator<ProcessorExchangePair> it = iterable.iterator();
                while (it.hasNext()) {
                    this.processorExchangeFactory.release(it.next().getExchange());
                }
            } catch (Exception e) {
                LOG.warn("Error releasing exchange due to {}. This exception is ignored.", e.getMessage(), e);
            }
        }
        if (iterable instanceof Closeable) {
            IOHelper.close((Closeable) iterable, "pairs", LOG);
        }
        if (z4) {
            exchange.getExchangeExtension().setRedeliveryExhausted(z5);
        }
        this.reactiveExecutor.schedule(asyncCallback);
    }

    protected void doAggregate(AtomicReference<Exchange> atomicReference, Exchange exchange, Exchange exchange2) {
        if (this.parallelAggregate) {
            doAggregateInternal(getAggregationStrategy(exchange), atomicReference, exchange, exchange2);
        } else {
            doAggregateSync(getAggregationStrategy(exchange), atomicReference, exchange, exchange2);
        }
    }

    private synchronized void doAggregateSync(AggregationStrategy aggregationStrategy, AtomicReference<Exchange> atomicReference, Exchange exchange, Exchange exchange2) {
        doAggregateInternal(aggregationStrategy, atomicReference, exchange, exchange2);
    }

    private void doAggregateInternal(AggregationStrategy aggregationStrategy, AtomicReference<Exchange> atomicReference, Exchange exchange, Exchange exchange2) {
        if (aggregationStrategy != null) {
            Exchange exchange3 = atomicReference.get();
            ExchangeHelper.prepareAggregation(exchange3, exchange);
            atomicReference.set(aggregationStrategy.aggregate(exchange3, exchange, exchange2));
        }
    }

    protected void updateNewExchange(Exchange exchange, int i, Iterable<ProcessorExchangePair> iterable, boolean z) {
        exchange.setProperty(ExchangePropertyKey.MULTICAST_INDEX, Integer.valueOf(i));
        if (z) {
            exchange.setProperty(ExchangePropertyKey.MULTICAST_COMPLETE, Boolean.FALSE);
        } else {
            exchange.setProperty(ExchangePropertyKey.MULTICAST_COMPLETE, Boolean.TRUE);
        }
    }

    protected Integer getExchangeIndex(Exchange exchange) {
        return (Integer) exchange.getProperty(ExchangePropertyKey.MULTICAST_INDEX, Integer.class);
    }

    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception {
        StreamCache copy;
        ArrayList arrayList = new ArrayList(this.processors.size());
        ConcurrentHashMap concurrentHashMap = null;
        StreamCache streamCache = null;
        if (isParallelProcessing() && (exchange.getIn().getBody() instanceof StreamCache)) {
            streamCache = (StreamCache) exchange.getIn().getBody();
        }
        int i = 0;
        for (Processor processor : this.processors) {
            Exchange createCorrelatedCopy = this.processorExchangeFactory.createCorrelatedCopy(exchange, false);
            createCorrelatedCopy.getExchangeExtension().setTransacted(exchange.isTransacted());
            if (exchange.isTransacted() && createCorrelatedCopy.getProperty(Exchange.TRANSACTION_CONTEXT_DATA) == null) {
                if (concurrentHashMap == null) {
                    concurrentHashMap = new ConcurrentHashMap();
                }
                createCorrelatedCopy.setProperty(Exchange.TRANSACTION_CONTEXT_DATA, concurrentHashMap);
            }
            if (streamCache != null && i > 0 && (copy = streamCache.copy(createCorrelatedCopy)) != null) {
                createCorrelatedCopy.getIn().setBody(copy);
            }
            if (createCorrelatedCopy.getProperty(ExchangePropertyKey.STREAM_CACHE_UNIT_OF_WORK) == null) {
                createCorrelatedCopy.setProperty(ExchangePropertyKey.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork());
            }
            if (isShareUnitOfWork()) {
                prepareSharedUnitOfWork(createCorrelatedCopy, exchange);
            }
            int i2 = i;
            i++;
            arrayList.add(createProcessorExchangePair(i2, processor, createCorrelatedCopy, ExchangeHelper.getRoute(exchange)));
        }
        if (exchange.getException() != null) {
            throw exchange.getException();
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ProcessorExchangePair createProcessorExchangePair(int i, Processor processor, Exchange exchange, Route route) {
        setToEndpoint(exchange, processor);
        Processor wrapInErrorHandler = wrapInErrorHandler(route, exchange, processor);
        if (this.onPrepare != null) {
            try {
                this.onPrepare.process(exchange);
            } catch (Exception e) {
                exchange.setException(e);
            }
        }
        return new DefaultProcessorExchangePair(i, processor, wrapInErrorHandler, exchange);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Processor wrapInErrorHandler(Route route, Exchange exchange, Processor processor) {
        Processor createUnitOfWorkProcessor;
        if (route != this.route && this.route != null) {
            throw new UnsupportedOperationException("Is this really correct ?");
        }
        Boolean bool = (Boolean) exchange.getProperty(ExchangePropertyKey.TRY_ROUTE_BLOCK);
        if (route == null || (bool != null && bool.booleanValue())) {
            createUnitOfWorkProcessor = createUnitOfWorkProcessor(route, processor, exchange);
        } else {
            Processor processor2 = this.errorHandlers != null ? this.errorHandlers.get(processor) : null;
            if (processor2 != null) {
                LOG.trace("Using existing error handler for: {}", processor);
                return processor2;
            }
            LOG.trace("Creating error handler for: {}", processor);
            try {
                createUnitOfWorkProcessor = createUnitOfWorkProcessor(route, wrapInErrorHandler(route, processor), exchange);
                boolean z = exchange.getProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null;
                ServiceHelper.startService(createUnitOfWorkProcessor);
                if (!z && this.errorHandlers != null) {
                    this.errorHandlers.putIfAbsent(processor, createUnitOfWorkProcessor);
                }
            } catch (Exception e) {
                throw RuntimeCamelException.wrapRuntimeCamelException(e);
            }
        }
        return createUnitOfWorkProcessor;
    }

    private Processor wrapInErrorHandler(Route route, Processor processor) throws Exception {
        return this.errorHandler instanceof ErrorHandlerSupport ? ((ErrorHandlerSupport) this.errorHandler).clone(processor) : this.camelContext.getCamelContextExtension().createErrorHandler(route, processor);
    }

    protected Processor createUnitOfWorkProcessor(Route route, Processor processor, Exchange exchange) {
        UnitOfWork unitOfWork = (UnitOfWork) exchange.getProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, UnitOfWork.class);
        return unitOfWork != null ? this.internalProcessorFactory.addChildUnitOfWorkProcessorAdvice(this.camelContext, processor, route, unitOfWork) : this.internalProcessorFactory.addUnitOfWorkProcessorAdvice(this.camelContext, processor, route);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareSharedUnitOfWork(Exchange exchange, Exchange exchange2) {
        exchange.setProperty(ExchangePropertyKey.PARENT_UNIT_OF_WORK, exchange2.getUnitOfWork());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        if (isParallelProcessing() && this.executorService == null) {
            throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set");
        }
        if (this.timeout > 0 && this.aggregateExecutorService == null) {
            this.aggregateExecutorService = createAggregateExecutorService(getClass().getSimpleName() + "-AggregateTask");
            this.shutdownAggregateExecutorService = true;
        }
        CamelContextAware.trySetCamelContext(this.aggregationStrategy, this.camelContext);
        ServiceHelper.startService(this.aggregationStrategy, this.processors, this.processorExchangeFactory);
    }

    protected synchronized ExecutorService createAggregateExecutorService(String str) {
        return this.camelContext.getExecutorServiceManager().newScheduledThreadPool(this, str, 0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doStop() throws Exception {
        ServiceHelper.stopService(this.processors, this.errorHandlers, this.aggregationStrategy, this.processorExchangeFactory);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.service.BaseService
    public void doShutdown() throws Exception {
        ServiceHelper.stopAndShutdownServices(this.processors, this.errorHandlers, this.aggregationStrategy, this.processorExchangeFactory);
        if (this.errorHandlers != null) {
            this.errorHandlers.clear();
        }
        if (this.shutdownExecutorService && this.executorService != null) {
            getCamelContext().getExecutorServiceManager().shutdownNow(this.executorService);
        }
        if (!this.shutdownAggregateExecutorService || this.aggregateExecutorService == null) {
            return;
        }
        getCamelContext().getExecutorServiceManager().shutdownNow(this.aggregateExecutorService);
    }

    protected static void setToEndpoint(Exchange exchange, Processor processor) {
        if (processor instanceof Producer) {
            exchange.setProperty(ExchangePropertyKey.TO_ENDPOINT, ((Producer) processor).getEndpoint().getEndpointUri());
        }
    }

    protected AggregationStrategy getAggregationStrategy(Exchange exchange) {
        Map cast;
        AggregationStrategy aggregationStrategy = null;
        if (exchange != null && (cast = CastUtils.cast((Map<?, ?>) exchange.getProperty(ExchangePropertyKey.AGGREGATION_STRATEGY, Map.class))) != null) {
            aggregationStrategy = (AggregationStrategy) cast.get(this);
        }
        if (aggregationStrategy == null) {
            aggregationStrategy = getAggregationStrategy();
        }
        return aggregationStrategy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
        Map cast = CastUtils.cast((Map<?, ?>) exchange.getProperty(ExchangePropertyKey.AGGREGATION_STRATEGY, Map.class));
        ConcurrentHashMap concurrentHashMap = cast == null ? new ConcurrentHashMap() : new ConcurrentHashMap(cast);
        concurrentHashMap.put(this, aggregationStrategy);
        exchange.setProperty(ExchangePropertyKey.AGGREGATION_STRATEGY, concurrentHashMap);
    }

    protected void removeAggregationStrategyFromExchange(Exchange exchange) {
        Map cast = CastUtils.cast((Map<?, ?>) exchange.getProperty(ExchangePropertyKey.AGGREGATION_STRATEGY, Map.class));
        if (cast == null) {
            return;
        }
        cast.remove(this);
    }

    public boolean isStreaming() {
        return this.streaming;
    }

    public boolean isStopOnException() {
        return this.stopOnException;
    }

    public Collection<Processor> getProcessors() {
        return this.processors;
    }

    public long getTimeout() {
        return this.timeout;
    }

    public int getCacheSize() {
        return this.cacheSize;
    }

    public AggregationStrategy getAggregationStrategy() {
        return this.aggregationStrategy;
    }

    public boolean isParallelProcessing() {
        return this.parallelProcessing;
    }

    @Deprecated(since = "4.7.0")
    public boolean isParallelAggregate() {
        return this.parallelAggregate;
    }

    public boolean isShareUnitOfWork() {
        return this.shareUnitOfWork;
    }

    public ExecutorService getAggregateExecutorService() {
        return this.aggregateExecutorService;
    }

    public void setAggregateExecutorService(ExecutorService executorService) {
        this.aggregateExecutorService = executorService;
        this.shutdownAggregateExecutorService = false;
    }

    @Override // org.apache.camel.Navigate
    public List<Processor> next() {
        if (hasNext()) {
            return new ArrayList(this.processors);
        }
        return null;
    }

    @Override // org.apache.camel.Navigate
    public boolean hasNext() {
        return (this.processors == null || this.processors.isEmpty()) ? false : true;
    }
}
