/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.dataset;

import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.dataset.DataSet;
import org.apache.camel.component.dataset.DataSetEndpoint;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.processor.ThroughputLogger;
import org.apache.camel.util.URISupport;

public class DataSetConsumer
extends DefaultConsumer {
    private final CamelContext camelContext;
    private DataSetEndpoint endpoint;
    private Processor reporter;
    private ExecutorService executorService;
    private final boolean withIndexHeader;

    public DataSetConsumer(DataSetEndpoint endpoint, Processor processor) {
        super(endpoint, processor);
        this.endpoint = endpoint;
        this.camelContext = endpoint.getCamelContext();
        this.withIndexHeader = !endpoint.getDataSetIndex().equals("off");
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        if (this.reporter == null) {
            this.reporter = this.createReporter();
        }
        DataSet dataSet = this.endpoint.getDataSet();
        long preloadSize = this.endpoint.getPreloadSize();
        this.sendMessages(0L, preloadSize);
        this.executorService = this.camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, this.endpoint.getEndpointUri());
        this.executorService.execute(() -> {
            if (this.endpoint.getInitialDelay() > 0L) {
                try {
                    Thread.sleep(this.endpoint.getInitialDelay());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
            this.sendMessages(preloadSize, dataSet.getSize());
        });
    }

    @Override
    protected void doStop() throws Exception {
        super.doStop();
        if (this.executorService != null) {
            this.camelContext.getExecutorServiceManager().shutdown(this.executorService);
            this.executorService = null;
        }
    }

    protected Exchange createExchange(long messageIndex) throws Exception {
        Exchange exchange = this.createExchange(false);
        this.endpoint.getDataSet().populateMessage(exchange, messageIndex);
        if (this.withIndexHeader) {
            Message in = exchange.getIn();
            in.setHeader("CamelDataSetIndex", messageIndex);
        }
        return exchange;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void sendMessages(long startIndex, long endIndex) {
        for (long i = startIndex; i < endIndex; ++i) {
            Exchange exchange = null;
            try {
                block9: {
                    exchange = this.createExchange(i);
                    this.getProcessor().process(exchange);
                    try {
                        long delay = this.endpoint.getProduceDelay();
                        if (delay <= 0L) break block9;
                        Thread.sleep(delay);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        this.releaseExchange(exchange, false);
                        break;
                    }
                }
                if (this.reporter == null) continue;
                this.reporter.process(exchange);
                continue;
            }
            catch (Exception e) {
                this.handleException(e);
                continue;
            }
            finally {
                this.releaseExchange(exchange, false);
            }
        }
    }

    protected ThroughputLogger createReporter() {
        String uri = URISupport.sanitizeUri(this.endpoint.getEndpointUri());
        CamelLogger logger = new CamelLogger(uri);
        ThroughputLogger answer = new ThroughputLogger(logger, (int)this.endpoint.getDataSet().getReportCount());
        answer.setAction("Sent");
        return answer;
    }
}

