/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.source;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.serialization.DecodeErrorPolicy;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark-project.guava.annotations.VisibleForTesting;
import org.spark-project.guava.base.Preconditions;
import org.spark-project.guava.base.Throwables;

public class SpoolDirectorySource
extends AbstractSource
implements Configurable,
EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(SpoolDirectorySource.class);
    private static final int POLL_DELAY_MS = 500;
    private String completedSuffix;
    private String spoolDirectory;
    private boolean fileHeader;
    private String fileHeaderKey;
    private boolean basenameHeader;
    private String basenameHeaderKey;
    private int batchSize;
    private String ignorePattern;
    private String trackerDirPath;
    private String deserializerType;
    private Context deserializerContext;
    private String deletePolicy;
    private String inputCharset;
    private DecodeErrorPolicy decodeErrorPolicy;
    private volatile boolean hasFatalError = false;
    private SourceCounter sourceCounter;
    ReliableSpoolingFileEventReader reader;
    private ScheduledExecutorService executor;
    private boolean backoff = true;
    private boolean hitChannelException = false;
    private int maxBackoff;
    private SpoolDirectorySourceConfigurationConstants.ConsumeOrder consumeOrder;

    @Override
    public synchronized void start() {
        logger.info("SpoolDirectorySource source starting with directory: {}", (Object)this.spoolDirectory);
        this.executor = Executors.newSingleThreadScheduledExecutor();
        File directory = new File(this.spoolDirectory);
        try {
            this.reader = new ReliableSpoolingFileEventReader.Builder().spoolDirectory(directory).completedSuffix(this.completedSuffix).ignorePattern(this.ignorePattern).trackerDirPath(this.trackerDirPath).annotateFileName(this.fileHeader).fileNameHeader(this.fileHeaderKey).annotateBaseName(this.basenameHeader).baseNameHeader(this.basenameHeaderKey).deserializerType(this.deserializerType).deserializerContext(this.deserializerContext).deletePolicy(this.deletePolicy).inputCharset(this.inputCharset).decodeErrorPolicy(this.decodeErrorPolicy).consumeOrder(this.consumeOrder).build();
        }
        catch (IOException ioe) {
            throw new FlumeException("Error instantiating spooling event parser", ioe);
        }
        SpoolDirectoryRunnable runner = new SpoolDirectoryRunnable(this.reader, this.sourceCounter);
        this.executor.scheduleWithFixedDelay(runner, 0L, 500L, TimeUnit.MILLISECONDS);
        super.start();
        logger.debug("SpoolDirectorySource source started");
        this.sourceCounter.start();
    }

    @Override
    public synchronized void stop() {
        this.executor.shutdown();
        try {
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        }
        catch (InterruptedException ex) {
            logger.info("Interrupted while awaiting termination", (Throwable)ex);
        }
        this.executor.shutdownNow();
        super.stop();
        this.sourceCounter.stop();
        logger.info("SpoolDir source {} stopped. Metrics: {}", (Object)this.getName(), (Object)this.sourceCounter);
    }

    @Override
    public String toString() {
        return "Spool Directory source " + this.getName() + ": { spoolDir: " + this.spoolDirectory + " }";
    }

    @Override
    public synchronized void configure(Context context) {
        this.spoolDirectory = context.getString("spoolDir");
        Preconditions.checkState((this.spoolDirectory != null ? 1 : 0) != 0, (Object)"Configuration must specify a spooling directory");
        this.completedSuffix = context.getString("fileSuffix", ".COMPLETED");
        this.deletePolicy = context.getString("deletePolicy", "never");
        this.fileHeader = context.getBoolean("fileHeader", false);
        this.fileHeaderKey = context.getString("fileHeaderKey", "file");
        this.basenameHeader = context.getBoolean("basenameHeader", false);
        this.basenameHeaderKey = context.getString("basenameHeaderKey", "basename");
        this.batchSize = context.getInteger("batchSize", 100);
        this.inputCharset = context.getString("inputCharset", "UTF-8");
        this.decodeErrorPolicy = DecodeErrorPolicy.valueOf(context.getString("decodeErrorPolicy", SpoolDirectorySourceConfigurationConstants.DEFAULT_DECODE_ERROR_POLICY).toUpperCase(Locale.ENGLISH));
        this.ignorePattern = context.getString("ignorePattern", "^$");
        this.trackerDirPath = context.getString("trackerDir", ".flumespool");
        this.deserializerType = context.getString("deserializer", "LINE");
        this.deserializerContext = new Context((Map<String, String>)context.getSubProperties("deserializer."));
        this.consumeOrder = SpoolDirectorySourceConfigurationConstants.ConsumeOrder.valueOf(context.getString("consumeOrder", SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER.toString()).toUpperCase(Locale.ENGLISH));
        Integer bufferMaxLineLength = context.getInteger("bufferMaxLineLength");
        if (bufferMaxLineLength != null && this.deserializerType != null && this.deserializerType.equalsIgnoreCase("LINE")) {
            this.deserializerContext.put("maxLineLength", bufferMaxLineLength.toString());
        }
        this.maxBackoff = context.getInteger("maxBackoff", SpoolDirectorySourceConfigurationConstants.DEFAULT_MAX_BACKOFF);
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(this.getName());
        }
    }

    @VisibleForTesting
    protected boolean hasFatalError() {
        return this.hasFatalError;
    }

    @VisibleForTesting
    protected void setBackOff(boolean backoff) {
        this.backoff = backoff;
    }

    @VisibleForTesting
    protected boolean hitChannelException() {
        return this.hitChannelException;
    }

    @VisibleForTesting
    protected SourceCounter getSourceCounter() {
        return this.sourceCounter;
    }

    private class SpoolDirectoryRunnable
    implements Runnable {
        private ReliableSpoolingFileEventReader reader;
        private SourceCounter sourceCounter;

        public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader, SourceCounter sourceCounter) {
            this.reader = reader;
            this.sourceCounter = sourceCounter;
        }

        @Override
        public void run() {
            int backoffInterval = 250;
            try {
                List<Event> events;
                while (!Thread.interrupted() && !(events = this.reader.readEvents(SpoolDirectorySource.this.batchSize)).isEmpty()) {
                    this.sourceCounter.addToEventReceivedCount(events.size());
                    this.sourceCounter.incrementAppendBatchReceivedCount();
                    try {
                        SpoolDirectorySource.this.getChannelProcessor().processEventBatch(events);
                        this.reader.commit();
                    }
                    catch (ChannelException ex) {
                        logger.warn("The channel is full, and cannot write data now. The source will try again after " + String.valueOf(backoffInterval) + " milliseconds");
                        SpoolDirectorySource.this.hitChannelException = true;
                        if (!SpoolDirectorySource.this.backoff) continue;
                        TimeUnit.MILLISECONDS.sleep(backoffInterval);
                        backoffInterval = (backoffInterval <<= 1) >= SpoolDirectorySource.this.maxBackoff ? SpoolDirectorySource.this.maxBackoff : backoffInterval;
                        continue;
                    }
                    backoffInterval = 250;
                    this.sourceCounter.addToEventAcceptedCount(events.size());
                    this.sourceCounter.incrementAppendBatchAcceptedCount();
                }
            }
            catch (Throwable t) {
                logger.error("FATAL: " + SpoolDirectorySource.this.toString() + ": " + "Uncaught exception in SpoolDirectorySource thread. " + "Restart or reconfigure Flume to continue processing.", t);
                SpoolDirectorySource.this.hasFatalError = true;
                Throwables.propagate((Throwable)t);
            }
        }
    }
}

