/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.processor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.processor.PipelineHelper;
import org.apache.camel.processor.PooledExchangeTask;
import org.apache.camel.processor.PooledExchangeTaskFactory;
import org.apache.camel.processor.PooledTaskFactory;
import org.apache.camel.processor.PrototypeTaskFactory;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.ReactiveExecutor;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Pipeline
extends AsyncProcessorSupport
implements Navigate<Processor>,
Traceable,
IdAware,
RouteIdAware {
    private static final Logger LOG = LoggerFactory.getLogger(Pipeline.class);
    private final CamelContext camelContext;
    private final ReactiveExecutor reactiveExecutor;
    private final List<AsyncProcessor> processors;
    private final int size;
    private PooledExchangeTaskFactory taskFactory;
    private String id;
    private String routeId;

    public Pipeline(CamelContext camelContext, Collection<Processor> processors) {
        this.camelContext = camelContext;
        this.reactiveExecutor = camelContext.adapt(ExtendedCamelContext.class).getReactiveExecutor();
        this.processors = processors.stream().map(AsyncProcessorConverterHelper::convert).collect(Collectors.toList());
        this.size = processors.size();
    }

    public static Processor newInstance(CamelContext camelContext, List<Processor> processors) {
        if (processors.isEmpty()) {
            return null;
        }
        if (processors.size() == 1) {
            return processors.get(0);
        }
        return new Pipeline(camelContext, processors);
    }

    public static Processor newInstance(CamelContext camelContext, Processor ... processors) {
        if (processors == null || processors.length == 0) {
            return null;
        }
        if (processors.length == 1) {
            return processors[0];
        }
        ArrayList<Processor> toBeProcessed = new ArrayList<Processor>(processors.length);
        for (Processor processor : processors) {
            if (processor == null) continue;
            toBeProcessed.add(processor);
        }
        return new Pipeline(camelContext, toBeProcessed);
    }

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        PooledExchangeTask task = this.taskFactory.acquire(exchange, callback);
        if (exchange.isTransacted()) {
            this.reactiveExecutor.scheduleSync(task);
        } else {
            this.reactiveExecutor.scheduleMain(task);
        }
        return false;
    }

    @Override
    protected void doBuild() throws Exception {
        PipelineHelper.warmup(LOG);
        PipelineTask dummy = new PipelineTask();
        LOG.trace("Warming up Pipeline loaded class: {}", (Object)dummy.getClass().getName());
        boolean pooled = this.camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
        if (pooled) {
            this.taskFactory = new PooledTaskFactory(this.getId()){

                @Override
                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
                    return new PipelineTask();
                }
            };
            int capacity = this.camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
            this.taskFactory.setCapacity(capacity);
        } else {
            this.taskFactory = new PrototypeTaskFactory(){

                @Override
                public PooledExchangeTask create(Exchange exchange, AsyncCallback callback) {
                    return new PipelineTask();
                }
            };
        }
        LOG.trace("Using TaskFactory: {}", (Object)this.taskFactory);
        ServiceHelper.buildService(this.taskFactory, this.processors);
    }

    @Override
    protected void doInit() throws Exception {
        ServiceHelper.initService(this.taskFactory, this.processors);
    }

    @Override
    protected void doStart() throws Exception {
        ServiceHelper.startService(this.taskFactory, this.processors);
    }

    @Override
    protected void doStop() throws Exception {
        ServiceHelper.stopService(this.taskFactory, this.processors);
    }

    @Override
    protected void doShutdown() throws Exception {
        ServiceHelper.stopAndShutdownServices(this.taskFactory, this.processors);
    }

    public String toString() {
        return this.id;
    }

    @Override
    public String getTraceLabel() {
        return "pipeline";
    }

    @Override
    public String getId() {
        return this.id;
    }

    @Override
    public void setId(String id) {
        this.id = id;
    }

    @Override
    public String getRouteId() {
        return this.routeId;
    }

    @Override
    public void setRouteId(String routeId) {
        this.routeId = routeId;
    }

    @Override
    public List<Processor> next() {
        if (!this.hasNext()) {
            return null;
        }
        return new ArrayList<Processor>(this.processors);
    }

    @Override
    public boolean hasNext() {
        return this.processors != null && !this.processors.isEmpty();
    }

    private final class PipelineTask
    implements PooledExchangeTask,
    AsyncCallback {
        private Exchange exchange;
        private AsyncCallback callback;
        private int index;

        PipelineTask() {
        }

        @Override
        public void prepare(Exchange exchange, AsyncCallback callback) {
            this.exchange = exchange;
            this.callback = callback;
            this.index = 0;
        }

        @Override
        public void reset() {
            this.exchange = null;
            this.callback = null;
            this.index = 0;
        }

        @Override
        public void done(boolean doneSync) {
            Pipeline.this.reactiveExecutor.schedule(this);
        }

        @Override
        public void run() {
            boolean first;
            boolean stop = this.exchange.isRouteStop();
            int num = this.index;
            boolean more = num < Pipeline.this.size;
            boolean bl = first = num == 0;
            if (!stop && more && (first || PipelineHelper.continueProcessing(this.exchange, "so breaking out of pipeline", LOG))) {
                if (this.exchange.hasOut()) {
                    this.exchange.setIn(this.exchange.getOut());
                    this.exchange.setOut(null);
                }
                AsyncProcessor processor = (AsyncProcessor)Pipeline.this.processors.get(this.index++);
                processor.process(this.exchange, this);
            } else {
                ExchangeHelper.copyResults(this.exchange, this.exchange);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Processing complete for exchangeId: {} >>> {}", (Object)this.exchange.getExchangeId(), (Object)this.exchange);
                }
                AsyncCallback cb = this.callback;
                Pipeline.this.taskFactory.release(this);
                Pipeline.this.reactiveExecutor.schedule(cb);
            }
        }
    }
}

