package org.apache.camel.component.hdfs;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import javax.security.auth.login.Configuration;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.support.ScheduledPollConsumer;
import org.apache.camel.util.IOHelper;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/hdfs/HdfsConsumer.class */
public final class HdfsConsumer extends ScheduledPollConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(HdfsConsumer.class);
    private final HdfsConfiguration endpointConfig;
    private final StringBuilder hdfsPath;
    private final Processor processor;
    private final HdfsInfoFactory hdfsInfoFactory;
    private final ReadWriteLock rwLock;

    public HdfsConsumer(HdfsEndpoint hdfsEndpoint, Processor processor, HdfsConfiguration hdfsConfiguration) {
        this(hdfsEndpoint, processor, hdfsConfiguration, new HdfsInfoFactory(hdfsConfiguration), hdfsConfiguration.getFileSystemType().getHdfsPath(hdfsConfiguration));
    }

    HdfsConsumer(HdfsEndpoint hdfsEndpoint, Processor processor, HdfsConfiguration hdfsConfiguration, HdfsInfoFactory hdfsInfoFactory, StringBuilder sb) {
        super(hdfsEndpoint, processor);
        this.rwLock = new ReentrantReadWriteLock();
        this.processor = processor;
        this.endpointConfig = hdfsConfiguration;
        this.hdfsPath = sb;
        this.hdfsInfoFactory = hdfsInfoFactory;
        setUseFixedDelay(true);
    }

    @Override // org.apache.camel.support.DefaultConsumer, org.apache.camel.EndpointAware
    public HdfsEndpoint getEndpoint() {
        return (HdfsEndpoint) super.getEndpoint();
    }

    @Override // org.apache.camel.support.ScheduledPollConsumer, org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    protected void doStart() throws Exception {
        super.doStart();
        if (this.endpointConfig.isConnectOnStartup()) {
            setupHdfs(true);
        }
    }

    private HdfsInfo setupHdfs(boolean z) throws IOException {
        String fileSystemLabel = this.endpointConfig.getFileSystemLabel(this.hdfsPath.toString());
        if (z) {
            LOG.info("Connecting to hdfs file-system {} (may take a while if connection is not available)", fileSystemLabel);
        } else {
            LOG.debug("Connecting to hdfs file-system {} (may take a while if connection is not available)", fileSystemLabel);
        }
        HdfsInfo newHdfsInfo = this.hdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString());
        if (z) {
            LOG.info("Connected to hdfs file-system {}", fileSystemLabel);
        } else {
            LOG.debug("Connected to hdfs file-system {}", fileSystemLabel);
        }
        return newHdfsInfo;
    }

    @Override // org.apache.camel.support.ScheduledPollConsumer
    protected int poll() throws Exception {
        Configuration jAASConfiguration = HdfsComponent.getJAASConfiguration();
        try {
            return doPoll();
        } finally {
            HdfsComponent.setJAASConfiguration(jAASConfiguration);
        }
    }

    protected int doPoll() throws IOException {
        FileStatus[] globStatus;
        HdfsInfo hdfsInfo = setupHdfs(false);
        if (hdfsInfo.getFileSystem().isFile(hdfsInfo.getPath())) {
            globStatus = hdfsInfo.getFileSystem().globStatus(hdfsInfo.getPath());
        } else {
            globStatus = hdfsInfo.getFileSystem().globStatus(hdfsInfo.getPath().suffix("/" + this.endpointConfig.getPattern()), new PathFilter() { // from class: org.apache.camel.component.hdfs.HdfsConsumer.1ExcludePathFilter
                @Override // org.apache.hadoop.fs.PathFilter
                public boolean accept(Path path) {
                    return (path.toString().endsWith(HdfsConsumer.this.endpointConfig.getOpenedSuffix()) || path.toString().endsWith(HdfsConsumer.this.endpointConfig.getReadSuffix())) ? false : true;
                }
            });
        }
        return processFileStatuses(hdfsInfo, (FileStatus[]) Optional.ofNullable(globStatus).orElse(new FileStatus[0]));
    }

    private int processFileStatuses(HdfsInfo hdfsInfo, FileStatus[] fileStatusArr) {
        AtomicInteger atomicInteger = new AtomicInteger();
        List list = (List) Arrays.stream(fileStatusArr).filter(fileStatus -> {
            return normalFileIsDirectoryHasSuccessFile(fileStatus, hdfsInfo);
        }).filter(this::hasMatchingOwner).limit(this.endpointConfig.getMaxMessagesPerPoll()).map(this::asHdfsFile).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        LOG.info("Processing [{}] valid files out of [{}] available.", Integer.valueOf(list.size()), Integer.valueOf(fileStatusArr.length));
        for (int i = 0; i < list.size(); i++) {
            HdfsInputStream hdfsInputStream = (HdfsInputStream) list.get(i);
            try {
                int processHdfsInputStream = processHdfsInputStream(hdfsInputStream, atomicInteger);
                LOG.debug("Processed [{}] files out of [{}].", Integer.valueOf(i), Integer.valueOf(list.size()));
                LOG.debug("File [{}] was split to [{}] messages.", Integer.valueOf(i), Integer.valueOf(processHdfsInputStream));
                IOHelper.close(hdfsInputStream, "hdfs file", LOG);
            } catch (Throwable th) {
                IOHelper.close(hdfsInputStream, "hdfs file", LOG);
                throw th;
            }
        }
        return atomicInteger.get();
    }

    private int processHdfsInputStream(HdfsInputStream hdfsInputStream, AtomicInteger atomicInteger) {
        AtomicInteger atomicInteger2 = new AtomicInteger();
        Holder<Object> holder = new Holder<>();
        Holder<Object> holder2 = new Holder<>();
        while (hdfsInputStream.next(holder, holder2) >= 0) {
            processHdfsInputStream(hdfsInputStream, holder, holder2, atomicInteger2, atomicInteger);
            atomicInteger2.incrementAndGet();
        }
        return atomicInteger2.get();
    }

    private void processHdfsInputStream(HdfsInputStream hdfsInputStream, Holder<Object> holder, Holder<Object> holder2, AtomicInteger atomicInteger, AtomicInteger atomicInteger2) {
        Exchange createExchange = createExchange(false);
        try {
            try {
                Message in = createExchange.getIn();
                String substringAfterLast = StringUtils.substringAfterLast(hdfsInputStream.getActualPath(), "/");
                in.setHeader("CamelFileName", substringAfterLast);
                in.setHeader("CamelFileNameConsumed", substringAfterLast);
                in.setHeader(HdfsConstants.FILE_ABSOLUTE_PATH, hdfsInputStream.getActualPath());
                if (holder.getValue() != null) {
                    in.setHeader(HdfsConstants.KEY, holder.getValue());
                }
                if (hdfsInputStream.getNumOfReadBytes() >= 0) {
                    in.setHeader("CamelFileLength", Long.valueOf(hdfsInputStream.getNumOfReadBytes()));
                }
                in.setBody(holder2.getValue());
                updateNewExchange(createExchange, atomicInteger.get(), hdfsInputStream);
                LOG.debug("Processing file [{}]", substringAfterLast);
                this.processor.process(createExchange);
                atomicInteger2.incrementAndGet();
                if (createExchange.getException() != null) {
                    getExceptionHandler().handleException(createExchange.getException());
                }
                releaseExchange(createExchange, false);
            } catch (Exception e) {
                createExchange.setException(e);
                if (createExchange.getException() != null) {
                    getExceptionHandler().handleException(createExchange.getException());
                }
                releaseExchange(createExchange, false);
            }
        } catch (Throwable th) {
            if (createExchange.getException() != null) {
                getExceptionHandler().handleException(createExchange.getException());
            }
            releaseExchange(createExchange, false);
            throw th;
        }
    }

    private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo hdfsInfo) {
        if (!this.endpointConfig.getFileType().equals(HdfsFileType.NORMAL_FILE) || !fileStatus.isDirectory()) {
            return true;
        }
        try {
            return hdfsInfo.getFileSystem().exists(new Path(fileStatus.getPath().toString() + "/_SUCCESS"));
        } catch (IOException e) {
            throw new RuntimeCamelException(e);
        }
    }

    private boolean hasMatchingOwner(FileStatus fileStatus) {
        if (this.endpointConfig.getOwner() == null || this.endpointConfig.getOwner().equals(fileStatus.getOwner())) {
            return true;
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("Skipping file: {} as not matching owner: {}", fileStatus.getPath(), this.endpointConfig.getOwner());
        return false;
    }

    private HdfsInputStream asHdfsFile(FileStatus fileStatus) {
        try {
            this.rwLock.writeLock().lock();
            return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), this.hdfsInfoFactory);
        } finally {
            this.rwLock.writeLock().unlock();
        }
    }

    protected void updateNewExchange(Exchange exchange, int i, HdfsInputStream hdfsInputStream) {
        ((ExtendedExchange) exchange.adapt(ExtendedExchange.class)).setUnitOfWork(null);
        exchange.setProperty(ExchangePropertyKey.SPLIT_INDEX, Integer.valueOf(i));
        if (hdfsInputStream.hasNext()) {
            exchange.setProperty(ExchangePropertyKey.SPLIT_COMPLETE, Boolean.FALSE);
        } else {
            exchange.setProperty(ExchangePropertyKey.SPLIT_COMPLETE, Boolean.TRUE);
            exchange.setProperty(ExchangePropertyKey.SPLIT_SIZE, Integer.valueOf(i + 1));
        }
    }
}
