package org.apache.flink.streaming.api.functions.source;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@Deprecated
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.class */
public class ContinuousFileMonitoringFunction<OUT> extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction {
    private static final long serialVersionUID = 1;
    private static final Logger LOG;
    public static final long MIN_MONITORING_INTERVAL = 1;
    private final String path;
    private final int readerParallelism;
    private final FileInputFormat<OUT> format;
    private final long interval;
    private final FileProcessingMode watchType;
    private volatile long globalModificationTime;
    private transient Object checkpointLock;
    private volatile boolean isRunning = true;
    private transient ListState<Long> checkpointedState;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ContinuousFileMonitoringFunction(FileInputFormat<OUT> fileInputFormat, FileProcessingMode fileProcessingMode, int i, long j) {
        Preconditions.checkArgument(fileProcessingMode == FileProcessingMode.PROCESS_ONCE || j >= 1, "The specified monitoring interval (" + j + " ms) is smaller than the minimum allowed one (1 ms).");
        Preconditions.checkArgument(fileInputFormat.getFilePaths().length == 1, "FileInputFormats with multiple paths are not supported yet.");
        this.format = (FileInputFormat) Preconditions.checkNotNull(fileInputFormat, "Unspecified File Input Format.");
        this.path = (String) Preconditions.checkNotNull(fileInputFormat.getFilePaths()[0].toString(), "Unspecified Path.");
        this.interval = j;
        this.watchType = fileProcessingMode;
        this.readerParallelism = Math.max(i, 1);
        this.globalModificationTime = Long.MIN_VALUE;
    }

    @VisibleForTesting
    public long getGlobalModificationTime() {
        return this.globalModificationTime;
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        Preconditions.checkState(this.checkpointedState == null, "The " + getClass().getSimpleName() + " has already been initialized.");
        this.checkpointedState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("file-monitoring-state", LongSerializer.INSTANCE));
        if (!functionInitializationContext.isRestored()) {
            LOG.info("No state to restore for the {}.", getClass().getSimpleName());
            return;
        }
        LOG.info("Restoring state for the {}.", getClass().getSimpleName());
        ArrayList arrayList = new ArrayList();
        Iterator it = this.checkpointedState.get().iterator();
        while (it.hasNext()) {
            arrayList.add((Long) it.next());
        }
        Preconditions.checkArgument(arrayList.size() <= 1, getClass().getSimpleName() + " retrieved invalid state.");
        if (arrayList.size() == 1 && this.globalModificationTime != Long.MIN_VALUE) {
            throw new IllegalArgumentException("The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");
        }
        if (arrayList.size() == 1) {
            this.globalModificationTime = ((Long) arrayList.get(0)).longValue();
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} retrieved a global mod time of {}.", getClass().getSimpleName(), Long.valueOf(this.globalModificationTime));
            }
        }
    }

    @Override // org.apache.flink.api.common.functions.RichFunction
    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        this.format.configure(new Configuration());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Opened {} (taskIdx= {}) for path: {}", new Object[]{getClass().getSimpleName(), Integer.valueOf(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), this.path});
        }
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void run(SourceFunction.SourceContext<TimestampedFileInputSplit> sourceContext) throws Exception {
        Path path = new Path(this.path);
        FileSystem fileSystem = FileSystem.get(path.toUri());
        if (!fileSystem.exists(path)) {
            throw new FileNotFoundException("The provided file path " + this.path + " does not exist.");
        }
        this.checkpointLock = sourceContext.getCheckpointLock();
        switch (this.watchType) {
            case PROCESS_CONTINUOUSLY:
                break;
            case PROCESS_ONCE:
                synchronized (this.checkpointLock) {
                    if (this.globalModificationTime == Long.MIN_VALUE) {
                        monitorDirAndForwardSplits(fileSystem, sourceContext);
                        this.globalModificationTime = Long.MAX_VALUE;
                    }
                    this.isRunning = false;
                }
                return;
            default:
                this.isRunning = false;
                throw new RuntimeException("Unknown WatchType" + this.watchType);
        }
        while (this.isRunning) {
            synchronized (this.checkpointLock) {
                monitorDirAndForwardSplits(fileSystem, sourceContext);
            }
            Thread.sleep(this.interval);
        }
    }

    private void monitorDirAndForwardSplits(FileSystem fileSystem, SourceFunction.SourceContext<TimestampedFileInputSplit> sourceContext) throws IOException {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        for (Map.Entry<Long, List<TimestampedFileInputSplit>> entry : getInputSplitsSortedByModTime(listEligibleFiles(fileSystem, new Path(this.path))).entrySet()) {
            long longValue = entry.getKey().longValue();
            for (TimestampedFileInputSplit timestampedFileInputSplit : entry.getValue()) {
                LOG.info("Forwarding split: " + timestampedFileInputSplit);
                sourceContext.collect(timestampedFileInputSplit);
            }
            this.globalModificationTime = Math.max(this.globalModificationTime, longValue);
        }
    }

    private Map<Long, List<TimestampedFileInputSplit>> getInputSplitsSortedByModTime(Map<Path, FileStatus> map) throws IOException {
        TreeMap treeMap = new TreeMap();
        if (map.isEmpty()) {
            return treeMap;
        }
        for (FileInputSplit fileInputSplit : this.format.createInputSplits(this.readerParallelism)) {
            FileStatus fileStatus = map.get(fileInputSplit.getPath());
            if (fileStatus != null) {
                Long valueOf = Long.valueOf(fileStatus.getModificationTime());
                List list = (List) treeMap.get(valueOf);
                if (list == null) {
                    list = new ArrayList();
                    treeMap.put(valueOf, list);
                }
                list.add(new TimestampedFileInputSplit(valueOf.longValue(), fileInputSplit.getSplitNumber(), fileInputSplit.getPath(), fileInputSplit.getStart(), fileInputSplit.getLength(), fileInputSplit.getHostnames()));
            }
        }
        return treeMap;
    }

    private Map<Path, FileStatus> listEligibleFiles(FileSystem fileSystem, Path path) {
        try {
            FileStatus[] listStatus = fileSystem.listStatus(path);
            if (listStatus == null) {
                LOG.warn("Path does not exist: {}", path);
                return Collections.emptyMap();
            }
            HashMap hashMap = new HashMap();
            for (FileStatus fileStatus : listStatus) {
                if (!fileStatus.isDir()) {
                    Path path2 = fileStatus.getPath();
                    if (!shouldIgnore(path2, fileStatus.getModificationTime())) {
                        hashMap.put(path2, fileStatus);
                    }
                } else if (this.format.getNestedFileEnumeration() && this.format.acceptFile(fileStatus)) {
                    hashMap.putAll(listEligibleFiles(fileSystem, fileStatus.getPath()));
                }
            }
            return hashMap;
        } catch (IOException e) {
            return Collections.emptyMap();
        }
    }

    private boolean shouldIgnore(Path path, long j) {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        boolean z = j <= this.globalModificationTime;
        if (z && LOG.isDebugEnabled()) {
            LOG.debug("Ignoring " + path + ", with mod time= " + j + " and global mod time= " + this.globalModificationTime);
        }
        return z;
    }

    @Override // org.apache.flink.api.common.functions.AbstractRichFunction, org.apache.flink.api.common.functions.RichFunction
    public void close() throws Exception {
        super.close();
        if (this.checkpointLock != null) {
            synchronized (this.checkpointLock) {
                this.globalModificationTime = Long.MAX_VALUE;
                this.isRunning = false;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Closed File Monitoring Source for path: " + this.path + ".");
        }
    }

    @Override // org.apache.flink.streaming.api.functions.source.SourceFunction
    public void cancel() {
        if (this.checkpointLock == null) {
            this.globalModificationTime = Long.MAX_VALUE;
            this.isRunning = false;
        } else {
            synchronized (this.checkpointLock) {
                this.globalModificationTime = Long.MAX_VALUE;
                this.isRunning = false;
            }
        }
    }

    @Override // org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        Preconditions.checkState(this.checkpointedState != null, "The " + getClass().getSimpleName() + " state has not been properly initialized.");
        this.checkpointedState.update(Collections.singletonList(Long.valueOf(this.globalModificationTime)));
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), Long.valueOf(this.globalModificationTime));
        }
    }

    static {
        $assertionsDisabled = !ContinuousFileMonitoringFunction.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ContinuousFileMonitoringFunction.class);
    }
}
