/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.file;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.regex.Pattern;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileEndpoint;
import org.apache.camel.component.file.GenericFileOnCompletion;
import org.apache.camel.component.file.GenericFileOperationFailedException;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.component.file.GenericFileProcessStrategy;
import org.apache.camel.support.EmptyAsyncCallback;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.StringHelper;
import org.apache.camel.util.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class GenericFileConsumer<T>
extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(GenericFileConsumer.class);
    protected GenericFileEndpoint<T> endpoint;
    protected GenericFileOperations<T> operations;
    protected GenericFileProcessStrategy<T> processStrategy;
    protected volatile ShutdownRunningTask shutdownRunningTask;
    protected volatile int pendingExchanges;
    protected Processor customProcessor;
    protected boolean eagerLimitMaxMessagesPerPoll = true;
    protected volatile boolean prepareOnStartup;
    private final Pattern includePattern;
    private final Pattern excludePattern;
    private final String[] includeExt;
    private final String[] excludeExt;

    public GenericFileConsumer(GenericFileEndpoint<T> endpoint, Processor processor, GenericFileOperations<T> operations, GenericFileProcessStrategy<T> processStrategy) {
        super(endpoint, processor);
        this.endpoint = endpoint;
        this.operations = operations;
        this.processStrategy = processStrategy;
        this.includePattern = endpoint.getIncludePattern();
        this.excludePattern = endpoint.getExcludePattern();
        this.includeExt = endpoint.getIncludeExt() != null ? endpoint.getIncludeExt().split(",") : null;
        this.excludeExt = endpoint.getExcludeExt() != null ? endpoint.getExcludeExt().split(",") : null;
    }

    public Processor getCustomProcessor() {
        return this.customProcessor;
    }

    public void setCustomProcessor(Processor processor) {
        this.customProcessor = processor;
    }

    public boolean isEagerLimitMaxMessagesPerPoll() {
        return this.eagerLimitMaxMessagesPerPoll;
    }

    public void setEagerLimitMaxMessagesPerPoll(boolean eagerLimitMaxMessagesPerPoll) {
        this.eagerLimitMaxMessagesPerPoll = eagerLimitMaxMessagesPerPoll;
    }

    protected abstract Exchange createExchange(GenericFile<T> var1);

    @Override
    public int poll() throws Exception {
        int n;
        boolean limitHit;
        if (!this.prepareOnStartup) {
            this.processStrategy.prepareOnStartup(this.operations, this.endpoint);
            this.prepareOnStartup = true;
        }
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        if (!this.prePollCheck()) {
            LOG.debug("Skipping poll as pre poll check returned false");
            return 0;
        }
        ArrayList<GenericFile<T>> files = new ArrayList<GenericFile<T>>();
        String name = this.endpoint.getConfiguration().getDirectory();
        StopWatch stop = new StopWatch();
        try {
            limitHit = !this.pollDirectory(name, files, 0);
        }
        catch (Exception e) {
            LOG.debug("Error occurred during poll directory: {} due {}. Removing {} files marked as in-progress.", new Object[]{name, e.getMessage(), files.size()});
            this.removeExcessiveInProgressFiles(files);
            throw e;
        }
        long delta = stop.taken();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Took {} to poll: {}", (Object)TimeUtils.printDuration(delta), (Object)name);
        }
        if (limitHit) {
            LOG.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", (Object)this.maxMessagesPerPoll);
        }
        if (this.endpoint.getSorter() != null) {
            files.sort(this.endpoint.getSorter());
        }
        LinkedList<Exchange> exchanges = new LinkedList<Exchange>();
        for (GenericFile genericFile : files) {
            Exchange exchange = this.createExchange(genericFile);
            this.endpoint.configureExchange(exchange);
            this.endpoint.configureMessage(genericFile, exchange.getIn());
            exchanges.add(exchange);
        }
        if (this.endpoint.getSortBy() != null) {
            exchanges.sort(this.endpoint.getSortBy());
        }
        if (this.endpoint.isShuffle()) {
            Collections.shuffle(exchanges);
        }
        LinkedList<Exchange> q = exchanges;
        if (!this.eagerLimitMaxMessagesPerPoll && this.maxMessagesPerPoll > 0 && files.size() > this.maxMessagesPerPoll) {
            LOG.debug("Limiting maximum messages to poll at {} files as there were more messages in this poll.", (Object)this.maxMessagesPerPoll);
            this.removeExcessiveInProgressFiles(q, this.maxMessagesPerPoll);
        }
        if ((n = exchanges.size()) > 0) {
            LOG.debug("Total {} files to consume", (Object)n);
        }
        int polledMessages = this.processBatch(CastUtils.cast(q));
        this.postPollCheck(polledMessages);
        return polledMessages;
    }

    @Override
    public int processBatch(Queue<Object> exchanges) {
        int total;
        int answer = total = exchanges.size();
        if (this.maxMessagesPerPoll > 0 && total > this.maxMessagesPerPoll) {
            LOG.debug("Limiting to maximum messages to poll {} as there were {} messages in this poll.", (Object)this.maxMessagesPerPoll, (Object)total);
            total = this.maxMessagesPerPoll;
        }
        for (int index = 0; index < total && this.isBatchAllowed(); ++index) {
            Exchange exchange = (Exchange)exchanges.poll();
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, (Object)index);
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, (Object)total);
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, (Object)(index == total - 1 ? 1 : 0));
            this.pendingExchanges = total - index - 1;
            boolean started = this.customProcessor != null ? this.customProcessExchange(exchange, this.customProcessor) : this.processExchange(exchange);
            if (started) continue;
            --answer;
        }
        this.removeExcessiveInProgressFiles(CastUtils.cast((Deque)exchanges, Exchange.class), 0);
        return answer;
    }

    protected void removeExcessiveInProgressFiles(Deque<Exchange> exchanges, int limit) {
        while (exchanges.size() > limit) {
            Exchange exchange = exchanges.removeLast();
            GenericFile file = exchange.getProperty("CamelFileExchangeFile", GenericFile.class);
            String key = file.getAbsoluteFilePath();
            this.endpoint.getInProgressRepository().remove(key);
            this.releaseExchange(exchange, true);
        }
    }

    protected void removeExcessiveInProgressFiles(List<GenericFile<T>> files) {
        for (GenericFile<T> file : files) {
            String key = file.getAbsoluteFilePath();
            this.endpoint.getInProgressRepository().remove(key);
        }
    }

    public boolean canPollMoreFiles(List<?> fileList) {
        if (!this.eagerLimitMaxMessagesPerPoll) {
            return true;
        }
        if (this.maxMessagesPerPoll <= 0) {
            return true;
        }
        return fileList.size() < this.maxMessagesPerPoll;
    }

    protected boolean prePollCheck() throws Exception {
        return true;
    }

    protected void postPollCheck(int polledMessages) {
    }

    protected abstract boolean pollDirectory(String var1, List<GenericFile<T>> var2, int var3);

    public void setOperations(GenericFileOperations<T> operations) {
        this.operations = operations;
    }

    protected boolean ignoreCannotRetrieveFile(String name, Exchange exchange, Exception cause) {
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean processExchange(Exchange exchange) {
        GenericFile<T> file = this.getExchangeFileProperty(exchange);
        LOG.trace("Processing file: {}", file);
        String absoluteFileName = file.getAbsoluteFilePath();
        Exception beginCause = null;
        boolean begin = false;
        try {
            begin = this.processStrategy.begin(this.operations, this.endpoint, exchange, file);
        }
        catch (Exception e) {
            beginCause = e;
        }
        if (!begin) {
            Exception abortCause = null;
            LOG.debug("{} cannot begin processing file: {}", this.endpoint, file);
            try {
                this.processStrategy.abort(this.operations, this.endpoint, exchange, file);
            }
            catch (Exception e) {
                abortCause = e;
            }
            finally {
                this.endpoint.getInProgressRepository().remove(absoluteFileName);
            }
            if (beginCause != null) {
                String msg = this.endpoint + " cannot begin processing file: " + file + " due to: " + beginCause.getMessage();
                this.handleException(msg, beginCause);
            }
            if (abortCause != null) {
                String msg2 = this.endpoint + " cannot abort processing file: " + file + " due to: " + abortCause.getMessage();
                this.handleException(msg2, abortCause);
            }
            return false;
        }
        GenericFile<T> target = this.getExchangeFileProperty(exchange);
        this.updateFileHeaders(target, exchange.getIn());
        String name = target.getAbsoluteFilePath();
        try {
            if (this.isRetrieveFile()) {
                boolean retrieved;
                LOG.trace("Retrieving file: {} from: {}", (Object)name, this.endpoint);
                Exception cause = null;
                try {
                    retrieved = this.operations.retrieveFile(name, exchange, target.getFileLength());
                }
                catch (Exception e) {
                    retrieved = false;
                    cause = e;
                }
                if (!retrieved) {
                    if (this.ignoreCannotRetrieveFile(name, exchange, cause)) {
                        LOG.trace("Cannot retrieve file {} maybe it does not exists. Ignoring.", (Object)name);
                        this.endpoint.getInProgressRepository().remove(absoluteFileName);
                        return false;
                    }
                    if (cause instanceof GenericFileOperationFailedException) {
                        throw cause;
                    }
                    throw new GenericFileOperationFailedException("Cannot retrieve file: " + file + " from: " + this.endpoint, cause);
                }
                LOG.trace("Retrieved file: {} from: {}", (Object)name, this.endpoint);
            } else {
                LOG.trace("Skipped retrieval of file: {} from: {}", (Object)name, this.endpoint);
                exchange.getIn().setBody(null);
            }
            exchange.adapt(ExtendedExchange.class).addOnCompletion(new GenericFileOnCompletion<T>(this.endpoint, this.operations, this.processStrategy, target, absoluteFileName));
            LOG.debug("About to process file: {} using exchange: {}", target, (Object)exchange);
            if (this.endpoint.isSynchronous()) {
                this.getProcessor().process(exchange);
            } else {
                this.getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
            }
        }
        catch (Exception e) {
            this.endpoint.getInProgressRepository().remove(absoluteFileName);
            String msg = "Error processing file " + file + " due to " + e.getMessage();
            this.handleException(msg, e);
        }
        return true;
    }

    protected abstract void updateFileHeaders(GenericFile<T> var1, Message var2);

    protected boolean isRetrieveFile() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean customProcessExchange(Exchange exchange, Processor processor) {
        GenericFile<T> file = this.getExchangeFileProperty(exchange);
        LOG.trace("Custom processing file: {}", file);
        String absoluteFileName = file.getAbsoluteFilePath();
        try {
            processor.process(exchange);
        }
        catch (Exception e) {
            LOG.debug("{} error custom processing: {} due to: {}. This exception will be ignored.", new Object[]{this.endpoint, file, e.getMessage(), e});
            this.handleException(e);
        }
        finally {
            this.endpoint.getInProgressRepository().remove(absoluteFileName);
        }
        return true;
    }

    protected boolean isValidFile(GenericFile<T> file, boolean isDirectory, List<T> files) {
        String absoluteFilePath = file.getAbsoluteFilePath();
        if (!this.isMatched(file, isDirectory, files)) {
            LOG.trace("File did not match. Will skip this file: {}", file);
            return false;
        }
        if (isDirectory) {
            return true;
        }
        if (this.endpoint.getInProgressRepository().contains(absoluteFilePath)) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Skipping as file is already in progress: {}", (Object)file.getFileName());
            }
            return false;
        }
        if (this.endpoint.isIdempotent().booleanValue()) {
            String key = file.getAbsoluteFilePath();
            if (this.endpoint.getIdempotentKey() != null) {
                Exchange dummy = this.endpoint.createExchange(file);
                key = this.endpoint.getIdempotentKey().evaluate(dummy, String.class);
                LOG.trace("Evaluated idempotentKey: {} for file: {}", (Object)key, file);
            }
            if (key != null && this.endpoint.getIdempotentRepository().contains(key)) {
                LOG.trace("This consumer is idempotent and the file has been consumed before matching idempotentKey: {}. Will skip this file: {}", (Object)key, file);
                return false;
            }
        }
        return this.endpoint.getInProgressRepository().add(absoluteFilePath);
    }

    protected boolean isMatched(GenericFile<T> file, boolean isDirectory, List<T> files) {
        String result;
        String ext;
        Exchange dummy;
        String name = file.getFileNameOnly();
        if (name.startsWith(".")) {
            return false;
        }
        if (name.endsWith(".camelLock")) {
            return false;
        }
        if (this.endpoint.getFilter() != null && !this.endpoint.getFilter().accept(file)) {
            return false;
        }
        if (this.endpoint.getAntFilter() != null && !this.endpoint.getAntFilter().accept(file)) {
            return false;
        }
        if (isDirectory && this.endpoint.getFilterDirectory() != null) {
            dummy = this.endpoint.createExchange(file);
            boolean matches = this.endpoint.getFilterDirectory().matches(dummy);
            if (!matches) {
                return false;
            }
        }
        if (isDirectory) {
            return true;
        }
        if (this.excludePattern != null && this.excludePattern.matcher(name).matches()) {
            return false;
        }
        if (this.excludeExt != null) {
            ext = FileUtil.onlyExt(file.getFileName());
            for (String exclude : this.excludeExt) {
                if (!exclude.equalsIgnoreCase(ext)) continue;
                return false;
            }
        }
        if (this.includePattern != null && !this.includePattern.matcher(name).matches()) {
            return false;
        }
        if (this.includeExt != null) {
            ext = FileUtil.onlyExt(file.getFileName());
            boolean any = false;
            for (String include : this.includeExt) {
                any |= include.equalsIgnoreCase(ext);
            }
            if (!any) {
                return false;
            }
        }
        if (this.endpoint.getFileName() != null && (result = this.evaluateFileExpression(dummy = this.endpoint.createExchange(file))) != null && !name.equals(result)) {
            return false;
        }
        if (this.endpoint.getFilterFile() != null) {
            dummy = this.endpoint.createExchange(file);
            boolean matches = this.endpoint.getFilterFile().matches(dummy);
            if (!matches) {
                return false;
            }
        }
        if (this.endpoint.getDoneFileName() != null) {
            String doneFileName = this.endpoint.createDoneFileName(file.getAbsoluteFilePath());
            StringHelper.notEmpty(doneFileName, "doneFileName", this.endpoint);
            if (this.endpoint.isDoneFile(file.getFileNameOnly())) {
                LOG.trace("Skipping done file: {}", file);
                return false;
            }
            if (!this.isMatched(file, doneFileName, files)) {
                return false;
            }
        }
        return true;
    }

    protected abstract boolean isMatched(GenericFile<T> var1, String var2, List<T> var3);

    protected String evaluateFileExpression(Exchange exchange) {
        String result = this.endpoint.getFileName().evaluate(exchange, String.class);
        if (exchange.getException() != null) {
            throw RuntimeCamelException.wrapRuntimeCamelException(exchange.getException());
        }
        return result;
    }

    private GenericFile<T> getExchangeFileProperty(Exchange exchange) {
        return (GenericFile)exchange.getProperty("CamelFileExchangeFile");
    }

    @Override
    protected void doInit() throws Exception {
        super.doInit();
        if (this.processStrategy instanceof CamelContextAware) {
            ((CamelContextAware)((Object)this.processStrategy)).setCamelContext(this.getEndpoint().getCamelContext());
        }
    }

    @Override
    protected void doStart() throws Exception {
        ServiceHelper.startService(this.processStrategy);
        super.doStart();
    }

    @Override
    protected void doStop() throws Exception {
        this.prepareOnStartup = false;
        super.doStop();
        ServiceHelper.stopService(this.processStrategy);
    }

    @Override
    public void onInit() throws Exception {
    }

    @Override
    public long beforePoll(long timeout) throws Exception {
        return timeout;
    }

    @Override
    public void afterPoll() throws Exception {
    }
}

