/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.client.avro;

import com.google.common.base.Optional;
import java.io.File;
import java.io.FileFilter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.regex.Pattern;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.client.avro.ReliableEventReader;
import org.apache.flume.serialization.DecodeErrorPolicy;
import org.apache.flume.serialization.DurablePositionTracker;
import org.apache.flume.serialization.EventDeserializer;
import org.apache.flume.serialization.EventDeserializerFactory;
import org.apache.flume.serialization.ResettableFileInputStream;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
import org.apache.flume.tools.PlatformDetect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.annotations.VisibleForTesting;
import org.spark-project.guava.base.Charsets;
import org.spark-project.guava.base.Preconditions;
import org.spark-project.guava.io.Files;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReliableSpoolingFileEventReader
implements ReliableEventReader {
    private static final Logger logger = LoggerFactory.getLogger(ReliableSpoolingFileEventReader.class);
    static final String metaFileName = ".flumespool-main.meta";
    private final File spoolDirectory;
    private final String completedSuffix;
    private final String deserializerType;
    private final Context deserializerContext;
    private final Pattern ignorePattern;
    private final File metaFile;
    private final boolean annotateFileName;
    private final boolean annotateBaseName;
    private final String fileNameHeader;
    private final String baseNameHeader;
    private final String deletePolicy;
    private final Charset inputCharset;
    private final DecodeErrorPolicy decodeErrorPolicy;
    private final SpoolDirectorySourceConfigurationConstants.ConsumeOrder consumeOrder;
    private Optional<FileInfo> currentFile = Optional.absent();
    private Optional<FileInfo> lastFileRead = Optional.absent();
    private boolean committed = true;
    private Iterator<File> candidateFileIter = null;
    private int listFilesCount = 0;

    private ReliableSpoolingFileEventReader(File spoolDirectory, String completedSuffix, String ignorePattern, String trackerDirPath, boolean annotateFileName, String fileNameHeader, boolean annotateBaseName, String baseNameHeader, String deserializerType, Context deserializerContext, String deletePolicy, String inputCharset, DecodeErrorPolicy decodeErrorPolicy, SpoolDirectorySourceConfigurationConstants.ConsumeOrder consumeOrder) throws IOException {
        Preconditions.checkNotNull((Object)spoolDirectory);
        Preconditions.checkNotNull((Object)completedSuffix);
        Preconditions.checkNotNull((Object)ignorePattern);
        Preconditions.checkNotNull((Object)trackerDirPath);
        Preconditions.checkNotNull((Object)deserializerType);
        Preconditions.checkNotNull((Object)deserializerContext);
        Preconditions.checkNotNull((Object)deletePolicy);
        Preconditions.checkNotNull((Object)inputCharset);
        if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) && !deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
            throw new IllegalArgumentException("Delete policies other than NEVER and IMMEDIATE are not yet supported");
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Initializing {} with directory={}, metaDir={}, deserializer={}", new Object[]{ReliableSpoolingFileEventReader.class.getSimpleName(), spoolDirectory, trackerDirPath, deserializerType});
        }
        Preconditions.checkState((boolean)spoolDirectory.exists(), (Object)("Directory does not exist: " + spoolDirectory.getAbsolutePath()));
        Preconditions.checkState((boolean)spoolDirectory.isDirectory(), (Object)("Path is not a directory: " + spoolDirectory.getAbsolutePath()));
        try {
            File canary = File.createTempFile("flume-spooldir-perm-check-", ".canary", spoolDirectory);
            Files.write((CharSequence)"testing flume file permissions\n", (File)canary, (Charset)Charsets.UTF_8);
            List lines = Files.readLines((File)canary, (Charset)Charsets.UTF_8);
            Preconditions.checkState((!lines.isEmpty() ? 1 : 0) != 0, (String)"Empty canary file %s", (Object[])new Object[]{canary});
            if (!canary.delete()) {
                throw new IOException("Unable to delete canary file " + canary);
            }
            logger.debug("Successfully created and deleted canary file: {}", (Object)canary);
        }
        catch (IOException e) {
            throw new FlumeException("Unable to read and modify files in the spooling directory: " + spoolDirectory, e);
        }
        this.spoolDirectory = spoolDirectory;
        this.completedSuffix = completedSuffix;
        this.deserializerType = deserializerType;
        this.deserializerContext = deserializerContext;
        this.annotateFileName = annotateFileName;
        this.fileNameHeader = fileNameHeader;
        this.annotateBaseName = annotateBaseName;
        this.baseNameHeader = baseNameHeader;
        this.ignorePattern = Pattern.compile(ignorePattern);
        this.deletePolicy = deletePolicy;
        this.inputCharset = Charset.forName(inputCharset);
        this.decodeErrorPolicy = (DecodeErrorPolicy)((Object)Preconditions.checkNotNull((Object)((Object)decodeErrorPolicy)));
        this.consumeOrder = (SpoolDirectorySourceConfigurationConstants.ConsumeOrder)((Object)Preconditions.checkNotNull((Object)((Object)consumeOrder)));
        File trackerDirectory = new File(trackerDirPath);
        if (!trackerDirectory.isAbsolute()) {
            trackerDirectory = new File(spoolDirectory, trackerDirPath);
        }
        if (!trackerDirectory.exists() && !trackerDirectory.mkdir()) {
            throw new IOException("Unable to mkdir nonexistent meta directory " + trackerDirectory);
        }
        if (!trackerDirectory.isDirectory()) {
            throw new IOException("Specified meta directory is not a directory" + trackerDirectory);
        }
        this.metaFile = new File(trackerDirectory, metaFileName);
        if (this.metaFile.exists() && this.metaFile.length() == 0L) {
            this.deleteMetaFile();
        }
    }

    @VisibleForTesting
    int getListFilesCount() {
        return this.listFilesCount;
    }

    public String getLastFileRead() {
        if (!this.lastFileRead.isPresent()) {
            return null;
        }
        return ((FileInfo)this.lastFileRead.get()).getFile().getAbsolutePath();
    }

    @Override
    public Event readEvent() throws IOException {
        List<Event> events = this.readEvents(1);
        if (!events.isEmpty()) {
            return events.get(0);
        }
        return null;
    }

    @Override
    public List<Event> readEvents(int numEvents) throws IOException {
        if (!this.committed) {
            if (!this.currentFile.isPresent()) {
                throw new IllegalStateException("File should not roll when commit is outstanding.");
            }
            logger.info("Last read was never committed - resetting mark position.");
            ((FileInfo)this.currentFile.get()).getDeserializer().reset();
        } else {
            if (!this.currentFile.isPresent()) {
                this.currentFile = this.getNextFile();
            }
            if (!this.currentFile.isPresent()) {
                return Collections.emptyList();
            }
        }
        EventDeserializer des = ((FileInfo)this.currentFile.get()).getDeserializer();
        List<Event> events = des.readEvents(numEvents);
        while (events.isEmpty()) {
            logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one.");
            this.retireCurrentFile();
            this.currentFile = this.getNextFile();
            if (!this.currentFile.isPresent()) {
                return Collections.emptyList();
            }
            events = ((FileInfo)this.currentFile.get()).getDeserializer().readEvents(numEvents);
        }
        if (this.annotateFileName) {
            String filename = ((FileInfo)this.currentFile.get()).getFile().getAbsolutePath();
            for (Event event : events) {
                event.getHeaders().put(this.fileNameHeader, filename);
            }
        }
        if (this.annotateBaseName) {
            String basename = ((FileInfo)this.currentFile.get()).getFile().getName();
            for (Event event : events) {
                event.getHeaders().put(this.baseNameHeader, basename);
            }
        }
        this.committed = false;
        this.lastFileRead = this.currentFile;
        return events;
    }

    @Override
    public void close() throws IOException {
        if (this.currentFile.isPresent()) {
            ((FileInfo)this.currentFile.get()).getDeserializer().close();
            this.currentFile = Optional.absent();
        }
    }

    @Override
    public void commit() throws IOException {
        if (!this.committed && this.currentFile.isPresent()) {
            ((FileInfo)this.currentFile.get()).getDeserializer().mark();
            this.committed = true;
        }
    }

    private void retireCurrentFile() throws IOException {
        Preconditions.checkState((boolean)this.currentFile.isPresent());
        File fileToRoll = new File(((FileInfo)this.currentFile.get()).getFile().getAbsolutePath());
        ((FileInfo)this.currentFile.get()).getDeserializer().close();
        if (fileToRoll.lastModified() != ((FileInfo)this.currentFile.get()).getLastModified()) {
            String message = "File has been modified since being read: " + fileToRoll;
            throw new IllegalStateException(message);
        }
        if (fileToRoll.length() != ((FileInfo)this.currentFile.get()).getLength()) {
            String message = "File has changed size since being read: " + fileToRoll;
            throw new IllegalStateException(message);
        }
        if (this.deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) {
            this.rollCurrentFile(fileToRoll);
        } else if (this.deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
            this.deleteCurrentFile(fileToRoll);
        } else {
            throw new IllegalArgumentException("Unsupported delete policy: " + this.deletePolicy);
        }
    }

    /*
     * Enabled aggressive block sorting
     */
    private void rollCurrentFile(File fileToRoll) throws IOException {
        File dest = new File(fileToRoll.getPath() + this.completedSuffix);
        logger.info("Preparing to move file {} to {}", (Object)fileToRoll, (Object)dest);
        if (dest.exists() && PlatformDetect.isWindows()) {
            if (Files.equal((File)((FileInfo)this.currentFile.get()).getFile(), (File)dest)) {
                logger.warn("Completed file " + dest + " already exists, but files match, so continuing.");
                boolean deleted = fileToRoll.delete();
                if (deleted) return;
                logger.error("Unable to delete file " + fileToRoll.getAbsolutePath() + ". It will likely be ingested another time.");
                return;
            }
            String message = "File name has been re-used with different files. Spooling assumptions violated for " + dest;
            throw new IllegalStateException(message);
        }
        if (dest.exists()) {
            String message = "File name has been re-used with different files. Spooling assumptions violated for " + dest;
            throw new IllegalStateException(message);
        }
        boolean renamed = fileToRoll.renameTo(dest);
        if (renamed) {
            logger.debug("Successfully rolled file {} to {}", (Object)fileToRoll, (Object)dest);
            this.deleteMetaFile();
            return;
        }
        String message = "Unable to move " + fileToRoll + " to " + dest + ". This will likely cause duplicate events. Please verify that " + "flume has sufficient permissions to perform these operations.";
        throw new FlumeException(message);
    }

    private void deleteCurrentFile(File fileToDelete) throws IOException {
        logger.info("Preparing to delete file {}", (Object)fileToDelete);
        if (!fileToDelete.exists()) {
            logger.warn("Unable to delete nonexistent file: {}", (Object)fileToDelete);
            return;
        }
        if (!fileToDelete.delete()) {
            throw new IOException("Unable to delete spool file: " + fileToDelete);
        }
        this.deleteMetaFile();
    }

    private Optional<FileInfo> getNextFile() {
        List<Object> candidateFiles = Collections.emptyList();
        if (this.consumeOrder != SpoolDirectorySourceConfigurationConstants.ConsumeOrder.RANDOM || this.candidateFileIter == null || !this.candidateFileIter.hasNext()) {
            FileFilter filter = new FileFilter(){

                @Override
                public boolean accept(File candidate) {
                    String fileName = candidate.getName();
                    return !candidate.isDirectory() && !fileName.endsWith(ReliableSpoolingFileEventReader.this.completedSuffix) && !fileName.startsWith(".") && !ReliableSpoolingFileEventReader.this.ignorePattern.matcher(fileName).matches();
                }
            };
            candidateFiles = Arrays.asList(this.spoolDirectory.listFiles(filter));
            ++this.listFilesCount;
            this.candidateFileIter = candidateFiles.iterator();
        }
        if (!this.candidateFileIter.hasNext()) {
            return Optional.absent();
        }
        File selectedFile = this.candidateFileIter.next();
        if (this.consumeOrder == SpoolDirectorySourceConfigurationConstants.ConsumeOrder.RANDOM) {
            return this.openFile(selectedFile);
        }
        if (this.consumeOrder == SpoolDirectorySourceConfigurationConstants.ConsumeOrder.YOUNGEST) {
            for (File file : candidateFiles) {
                long compare = selectedFile.lastModified() - file.lastModified();
                if (compare == 0L) {
                    selectedFile = this.smallerLexicographical(selectedFile, file);
                    continue;
                }
                if (compare >= 0L) continue;
                selectedFile = file;
            }
        } else {
            for (File file : candidateFiles) {
                long compare = selectedFile.lastModified() - file.lastModified();
                if (compare == 0L) {
                    selectedFile = this.smallerLexicographical(selectedFile, file);
                    continue;
                }
                if (compare <= 0L) continue;
                selectedFile = file;
            }
        }
        return this.openFile(selectedFile);
    }

    private File smallerLexicographical(File f1, File f2) {
        if (f1.getName().compareTo(f2.getName()) < 0) {
            return f1;
        }
        return f2;
    }

    private Optional<FileInfo> openFile(File file) {
        try {
            String nextPath = file.getPath();
            DurablePositionTracker tracker = DurablePositionTracker.getInstance(this.metaFile, nextPath);
            if (!tracker.getTarget().equals(nextPath)) {
                tracker.close();
                this.deleteMetaFile();
                tracker = DurablePositionTracker.getInstance(this.metaFile, nextPath);
            }
            Preconditions.checkState((boolean)tracker.getTarget().equals(nextPath), (String)"Tracker target %s does not equal expected filename %s", (Object[])new Object[]{tracker.getTarget(), nextPath});
            ResettableFileInputStream in = new ResettableFileInputStream(file, tracker, 16384, this.inputCharset, this.decodeErrorPolicy);
            EventDeserializer deserializer = EventDeserializerFactory.getInstance(this.deserializerType, this.deserializerContext, in);
            return Optional.of((Object)new FileInfo(file, deserializer));
        }
        catch (FileNotFoundException e) {
            logger.warn("Could not find file: " + file, (Throwable)e);
            return Optional.absent();
        }
        catch (IOException e) {
            logger.error("Exception opening file: " + file, (Throwable)e);
            return Optional.absent();
        }
    }

    private void deleteMetaFile() throws IOException {
        if (this.metaFile.exists() && !this.metaFile.delete()) {
            throw new IOException("Unable to delete old meta file " + this.metaFile);
        }
    }

    public static class Builder {
        private File spoolDirectory;
        private String completedSuffix = "fileSuffix";
        private String ignorePattern = "^$";
        private String trackerDirPath = ".flumespool";
        private Boolean annotateFileName = false;
        private String fileNameHeader = "file";
        private Boolean annotateBaseName = false;
        private String baseNameHeader = "basename";
        private String deserializerType = "LINE";
        private Context deserializerContext = new Context();
        private String deletePolicy = "never";
        private String inputCharset = "UTF-8";
        private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy.valueOf(SpoolDirectorySourceConfigurationConstants.DEFAULT_DECODE_ERROR_POLICY.toUpperCase(Locale.ENGLISH));
        private SpoolDirectorySourceConfigurationConstants.ConsumeOrder consumeOrder = SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER;

        public Builder spoolDirectory(File directory) {
            this.spoolDirectory = directory;
            return this;
        }

        public Builder completedSuffix(String completedSuffix) {
            this.completedSuffix = completedSuffix;
            return this;
        }

        public Builder ignorePattern(String ignorePattern) {
            this.ignorePattern = ignorePattern;
            return this;
        }

        public Builder trackerDirPath(String trackerDirPath) {
            this.trackerDirPath = trackerDirPath;
            return this;
        }

        public Builder annotateFileName(Boolean annotateFileName) {
            this.annotateFileName = annotateFileName;
            return this;
        }

        public Builder fileNameHeader(String fileNameHeader) {
            this.fileNameHeader = fileNameHeader;
            return this;
        }

        public Builder annotateBaseName(Boolean annotateBaseName) {
            this.annotateBaseName = annotateBaseName;
            return this;
        }

        public Builder baseNameHeader(String baseNameHeader) {
            this.baseNameHeader = baseNameHeader;
            return this;
        }

        public Builder deserializerType(String deserializerType) {
            this.deserializerType = deserializerType;
            return this;
        }

        public Builder deserializerContext(Context deserializerContext) {
            this.deserializerContext = deserializerContext;
            return this;
        }

        public Builder deletePolicy(String deletePolicy) {
            this.deletePolicy = deletePolicy;
            return this;
        }

        public Builder inputCharset(String inputCharset) {
            this.inputCharset = inputCharset;
            return this;
        }

        public Builder decodeErrorPolicy(DecodeErrorPolicy decodeErrorPolicy) {
            this.decodeErrorPolicy = decodeErrorPolicy;
            return this;
        }

        public Builder consumeOrder(SpoolDirectorySourceConfigurationConstants.ConsumeOrder consumeOrder) {
            this.consumeOrder = consumeOrder;
            return this;
        }

        public ReliableSpoolingFileEventReader build() throws IOException {
            return new ReliableSpoolingFileEventReader(this.spoolDirectory, this.completedSuffix, this.ignorePattern, this.trackerDirPath, this.annotateFileName, this.fileNameHeader, this.annotateBaseName, this.baseNameHeader, this.deserializerType, this.deserializerContext, this.deletePolicy, this.inputCharset, this.decodeErrorPolicy, this.consumeOrder);
        }
    }

    @InterfaceAudience.Private
    @InterfaceStability.Unstable
    static enum DeletePolicy {
        NEVER,
        IMMEDIATE,
        DELAY;

    }

    private static class FileInfo {
        private final File file;
        private final long length;
        private final long lastModified;
        private final EventDeserializer deserializer;

        public FileInfo(File file, EventDeserializer deserializer) {
            this.file = file;
            this.length = file.length();
            this.lastModified = file.lastModified();
            this.deserializer = deserializer;
        }

        public long getLength() {
            return this.length;
        }

        public long getLastModified() {
            return this.lastModified;
        }

        public EventDeserializer getDeserializer() {
            return this.deserializer;
        }

        public File getFile() {
            return this.file;
        }
    }
}

