/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.clean;

import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CleanPlanner<T, I, K, O>
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(CleanPlanner.class);
    public static final Integer CLEAN_PLAN_VERSION_1 = CleanPlanV1MigrationHandler.VERSION;
    public static final Integer CLEAN_PLAN_VERSION_2;
    public static final Integer LATEST_CLEAN_PLAN_VERSION;
    private transient HoodieTimeline commitTimeline;
    private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingCompactionOperations;
    private final Map<HoodieFileGroupId, CompactionOperation> fgIdToPendingLogCompactionOperations;
    private final HoodieTable<T, I, K, O> hoodieTable;
    private final HoodieWriteConfig config;
    private transient HoodieEngineContext context;
    private final List<String> savepointedTimestamps;
    private Option<HoodieInstant> earliestCommitToRetain = Option.empty();

    public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig config) {
        this.context = context;
        this.hoodieTable = hoodieTable;
        this.config = config;
        SyncableFileSystemView fileSystemView = (SyncableFileSystemView)hoodieTable.getSliceView();
        this.fgIdToPendingCompactionOperations = fileSystemView.getPendingCompactionOperations().map(entry -> Pair.of(new HoodieFileGroupId(((CompactionOperation)entry.getValue()).getPartitionPath(), ((CompactionOperation)entry.getValue()).getFileId()), entry.getValue())).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        this.fgIdToPendingLogCompactionOperations = fileSystemView.getPendingLogCompactionOperations().map(entry -> Pair.of(new HoodieFileGroupId(((CompactionOperation)entry.getValue()).getPartitionPath(), ((CompactionOperation)entry.getValue()).getFileId()), entry.getValue())).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
        this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.emptyList() : (hoodieTable.isPartitioned() ? new ArrayList<String>(hoodieTable.getSavepointTimestamps()) : Collections.emptyList());
    }

    private HoodieTimeline getCommitTimeline() {
        if (this.commitTimeline == null) {
            this.commitTimeline = this.hoodieTable.getCompletedCommitsTimeline();
        }
        return this.commitTimeline;
    }

    List<String> getSavepointedTimestamps() {
        return this.savepointedTimestamps;
    }

    public Stream<String> getSavepointedDataFiles(String savepointTime) {
        HoodieSavepointMetadata metadata = this.getSavepointMetadata(savepointTime);
        return metadata.getPartitionMetadata().values().stream().flatMap(s -> s.getSavepointDataFile().stream());
    }

    private HoodieSavepointMetadata getSavepointMetadata(String savepointTimestamp) {
        if (!this.hoodieTable.getSavepointTimestamps().contains(savepointTimestamp)) {
            throw new HoodieSavepointException("Could not get data files for savepoint " + savepointTimestamp + ". No such savepoint.");
        }
        HoodieInstant instant = this.hoodieTable.getMetaClient().createNewInstant(HoodieInstant.State.COMPLETED, "savepoint", savepointTimestamp);
        try {
            return TimelineMetadataUtils.deserializeHoodieSavepointMetadata(this.hoodieTable.getActiveTimeline().getInstantDetails(instant).get());
        }
        catch (IOException e) {
            throw new HoodieSavepointException("Could not get savepointed data files for savepoint " + savepointTimestamp, e);
        }
    }

    public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
        switch (this.config.getCleanerPolicy()) {
            case KEEP_LATEST_COMMITS: 
            case KEEP_LATEST_BY_HOURS: {
                return this.getPartitionPathsForCleanByCommits(earliestRetainedInstant);
            }
            case KEEP_LATEST_FILE_VERSIONS: {
                return this.getPartitionPathsForFullCleaning();
            }
        }
        throw new IllegalStateException("Unknown Cleaner Policy");
    }

    private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
        Option<HoodieInstant> lastClean;
        if (!instantToRetain.isPresent()) {
            LOG.info("No earliest commit to retain. No need to scan partitions !!");
            return Collections.emptyList();
        }
        if (this.config.incrementalCleanerModeEnabled() && (lastClean = this.hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant()).isPresent()) {
            if (this.hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
                this.hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
            } else {
                HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils.deserializeHoodieCleanMetadata(this.hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
                if (cleanMetadata.getEarliestCommitToRetain() != null && !cleanMetadata.getEarliestCommitToRetain().trim().isEmpty() && !this.hoodieTable.getActiveTimeline().getCommitsTimeline().isBeforeTimelineStarts(cleanMetadata.getEarliestCommitToRetain())) {
                    return this.getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
                }
            }
        }
        return this.getPartitionPathsForFullCleaning();
    }

    private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata, Option<HoodieInstant> newInstantToRetain) {
        boolean isAnySavepointDeleted = this.isAnySavepointDeleted(cleanMetadata);
        if (isAnySavepointDeleted) {
            LOG.info("Since savepoints have been removed compared to previous clean, triggering clean planning for all partitions");
            return this.getPartitionPathsForFullCleaning();
        }
        LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have changed since last clean at {}. New Instant to retain {}.", (Object)cleanMetadata.getEarliestCommitToRetain(), newInstantToRetain);
        return this.hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(instant -> InstantComparison.compareTimestamps(instant.requestedTime(), InstantComparison.GREATER_THAN_OR_EQUALS, cleanMetadata.getEarliestCommitToRetain()) && InstantComparison.compareTimestamps(instant.requestedTime(), InstantComparison.LESSER_THAN, ((HoodieInstant)newInstantToRetain.get()).requestedTime())).flatMap(this::getPartitionsForInstants).distinct().collect(Collectors.toList());
    }

    private boolean isAnySavepointDeleted(HoodieCleanMetadata cleanMetadata) {
        List savepointedTimestampsFromLastClean;
        List list = savepointedTimestampsFromLastClean = cleanMetadata.getExtraMetadata() == null ? Collections.emptyList() : Arrays.stream(cleanMetadata.getExtraMetadata().getOrDefault("savepointed_timestamps", "").split(",")).filter(partition -> !StringUtils.isNullOrEmpty(partition)).collect(Collectors.toList());
        if (savepointedTimestampsFromLastClean.isEmpty()) {
            return false;
        }
        ArrayList removedSavepointedTimestamps = new ArrayList(savepointedTimestampsFromLastClean);
        removedSavepointedTimestamps.removeAll(this.savepointedTimestamps);
        return !removedSavepointedTimestamps.isEmpty();
    }

    private Stream<String> getPartitionsForInstants(HoodieInstant instant) {
        try {
            if ("replacecommit".equals(instant.getAction())) {
                HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(this.hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
                return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
            }
            HoodieCommitMetadata commitMetadata = this.hoodieTable.getMetaClient().getCommitMetadataSerDe().deserialize(instant, this.hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
            return commitMetadata.getPartitionToWriteStats().keySet().stream();
        }
        catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }

    private List<String> getPartitionPathsForFullCleaning() {
        try {
            return this.hoodieTable.getMetadataTable().getAllPartitionPaths();
        }
        catch (IOException ioe) {
            throw new HoodieIOException("Fetching all partitions failed ", ioe);
        }
    }

    private boolean isFileSliceExistInSavepointedFiles(FileSlice fs, List<String> savepointedFiles) {
        if (fs.getBaseFile().isPresent() && savepointedFiles.contains(fs.getBaseFile().get().getFileName())) {
            return true;
        }
        for (HoodieLogFile hoodieLogFile : fs.getLogFiles().collect(Collectors.toList())) {
            if (!savepointedFiles.contains(hoodieLogFile.getFileName())) continue;
            return true;
        }
        return false;
    }

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
        LOG.info("Cleaning {}, retaining latest {} file versions.", (Object)partitionPath, (Object)this.config.getCleanerFileVersionsRetained());
        ArrayList<CleanFileInfo> deletePaths = new ArrayList<CleanFileInfo>();
        List<String> savepointedFiles = this.hoodieTable.getSavepointTimestamps().stream().flatMap(this::getSavepointedDataFiles).collect(Collectors.toList());
        deletePaths.addAll(this.getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
        boolean toDeletePartition = false;
        List fileGroups = this.hoodieTable.getHoodieView().getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
        for (HoodieFileGroup fileGroup : fileGroups) {
            int keepVersions = this.config.getCleanerFileVersionsRetained();
            Iterator fileSliceIterator = fileGroup.getAllFileSlices().filter(fs -> !this.isFileSliceNeededForPendingMajorOrMinorCompaction((FileSlice)fs)).iterator();
            if (this.isFileGroupInPendingMajorOrMinorCompaction(fileGroup)) {
                --keepVersions;
            }
            while (fileSliceIterator.hasNext() && keepVersions > 0) {
                fileSliceIterator.next();
                --keepVersions;
            }
            while (fileSliceIterator.hasNext()) {
                FileSlice nextSlice = (FileSlice)fileSliceIterator.next();
                if (this.isFileSliceExistInSavepointedFiles(nextSlice, savepointedFiles)) continue;
                deletePaths.addAll(this.getCleanFileInfoForSlice(nextSlice));
            }
        }
        if (fileGroups.isEmpty() && !this.hasPendingFiles(partitionPath)) {
            toDeletePartition = true;
        }
        return Pair.of(toDeletePartition, deletePaths);
    }

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
        return this.getFilesToCleanKeepingLatestCommits(partitionPath, this.config.getCleanerCommitsRetained(), earliestCommitToRetain, HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
    }

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, Option<HoodieInstant> earliestCommitToRetain, HoodieCleaningPolicy policy) {
        if (policy != HoodieCleaningPolicy.KEEP_LATEST_COMMITS && policy != HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
            throw new IllegalArgumentException("getFilesToCleanKeepingLatestCommits can only be used for KEEP_LATEST_COMMITS or KEEP_LATEST_BY_HOURS");
        }
        LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
        ArrayList<CleanFileInfo> deletePaths = new ArrayList<CleanFileInfo>();
        List<String> savepointedFiles = this.hoodieTable.getSavepointTimestamps().stream().flatMap(this::getSavepointedDataFiles).collect(Collectors.toList());
        boolean toDeletePartition = false;
        if (this.getCommitTimeline().countInstants() > commitsRetained) {
            HoodieInstant earliestInstant = earliestCommitToRetain.get();
            deletePaths.addAll(this.getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetain));
            List fileGroups = this.hoodieTable.getHoodieView().getAllFileGroupsStateless(partitionPath).collect(Collectors.toList());
            for (HoodieFileGroup fileGroup : fileGroups) {
                List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
                if (fileSliceList.isEmpty()) continue;
                String lastVersion = ((FileSlice)fileSliceList.get(0)).getBaseInstantTime();
                String lastVersionBeforeEarliestCommitToRetain = this.getLatestVersionBeforeCommit(fileSliceList, earliestInstant);
                for (FileSlice aSlice : fileSliceList) {
                    Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
                    String fileCommitTime = aSlice.getBaseInstantTime();
                    if (this.isFileSliceExistInSavepointedFiles(aSlice, savepointedFiles) || fileCommitTime.equals(lastVersion) || fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain) || this.isFileSliceNeededForPendingMajorOrMinorCompaction(aSlice) || !InstantComparison.compareTimestamps(earliestInstant.requestedTime(), InstantComparison.GREATER_THAN, fileCommitTime)) continue;
                    aFile.ifPresent(hoodieDataFile -> {
                        deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
                        if (hoodieDataFile.getBootstrapBaseFile().isPresent() && this.config.shouldCleanBootstrapBaseFile().booleanValue()) {
                            deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
                        }
                    });
                    deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)).collect(Collectors.toList()));
                }
            }
            if (fileGroups.isEmpty() && !this.hasPendingFiles(partitionPath) && this.noSubsequentReplaceCommit(earliestInstant.requestedTime(), partitionPath)) {
                toDeletePartition = true;
            }
        }
        return Pair.of(toDeletePartition, deletePaths);
    }

    private boolean hasPendingFiles(String partitionPath) {
        try {
            HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(this.hoodieTable.getMetaClient(), this.hoodieTable.getActiveTimeline());
            StoragePath fullPartitionPath = new StoragePath(this.hoodieTable.getMetaClient().getBasePath(), partitionPath);
            fsView.addFilesToView(partitionPath, FSUtils.getAllDataFilesInPartition(this.hoodieTable.getStorage(), fullPartitionPath));
            return fsView.getAllFileGroups(partitionPath).findAny().isPresent();
        }
        catch (Exception ex) {
            LOG.warn("Error while checking the pending files under partition: " + partitionPath + ", assumes the files exist", (Throwable)ex);
            return true;
        }
    }

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
        return this.getFilesToCleanKeepingLatestCommits(partitionPath, 0, earliestCommitToRetain, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
    }

    private List<CleanFileInfo> getReplacedFilesEligibleToClean(List<String> savepointedFiles, String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
        Stream<HoodieFileGroup> replacedGroups = earliestCommitToRetain.isPresent() ? this.hoodieTable.getHoodieView().getReplacedFileGroupsBefore(earliestCommitToRetain.get().requestedTime(), partitionPath) : this.hoodieTable.getHoodieView().getAllReplacedFileGroups(partitionPath);
        return replacedGroups.flatMap(HoodieFileGroup::getAllFileSlices).filter(slice -> !this.isFileSliceExistInSavepointedFiles((FileSlice)slice, savepointedFiles)).flatMap(slice -> this.getCleanFileInfoForSlice((FileSlice)slice).stream()).collect(Collectors.toList());
    }

    private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, HoodieInstant instantTime) {
        for (FileSlice file : fileSliceList) {
            String fileCommitTime = file.getBaseInstantTime();
            if (!InstantComparison.compareTimestamps(instantTime.requestedTime(), InstantComparison.GREATER_THAN, fileCommitTime)) continue;
            return fileCommitTime;
        }
        return null;
    }

    private List<CleanFileInfo> getCleanFileInfoForSlice(FileSlice nextSlice) {
        ArrayList<CleanFileInfo> cleanPaths = new ArrayList<CleanFileInfo>();
        if (nextSlice.getBaseFile().isPresent()) {
            HoodieBaseFile dataFile = nextSlice.getBaseFile().get();
            cleanPaths.add(new CleanFileInfo(dataFile.getPath(), false));
            if (dataFile.getBootstrapBaseFile().isPresent() && this.config.shouldCleanBootstrapBaseFile().booleanValue()) {
                cleanPaths.add(new CleanFileInfo(dataFile.getBootstrapBaseFile().get().getPath(), true));
            }
        }
        cleanPaths.addAll(nextSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)).collect(Collectors.toList()));
        return cleanPaths;
    }

    public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String partitionPath, Option<HoodieInstant> earliestCommitToRetain) {
        Pair<Boolean, List<CleanFileInfo>> deletePaths;
        HoodieCleaningPolicy policy = this.config.getCleanerPolicy();
        if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
            deletePaths = this.getFilesToCleanKeepingLatestCommits(partitionPath, earliestCommitToRetain);
        } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
            deletePaths = this.getFilesToCleanKeepingLatestVersions(partitionPath);
        } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
            deletePaths = this.getFilesToCleanKeepingLatestHours(partitionPath, earliestCommitToRetain);
        } else {
            throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
        }
        LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath);
        if (deletePaths.getKey().booleanValue()) {
            LOG.info("Partition " + partitionPath + " to be deleted");
        }
        return deletePaths;
    }

    public Option<HoodieInstant> getEarliestCommitToRetain() {
        if (!this.earliestCommitToRetain.isPresent()) {
            this.earliestCommitToRetain = CleanerUtils.getEarliestCommitToRetain(this.hoodieTable.getMetaClient().getActiveTimeline().getCommitsAndCompactionTimeline(), this.config.getCleanerPolicy(), this.config.getCleanerCommitsRetained(), Instant.now(), this.config.getCleanerHoursRetained(), this.hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone());
        }
        return this.earliestCommitToRetain;
    }

    public String getLastCompletedCommitTimestamp() {
        return this.getCommitTimeline().lastInstant().map(HoodieInstant::requestedTime).orElse("");
    }

    private boolean isFileSliceNeededForPendingMajorOrMinorCompaction(FileSlice fileSlice) {
        return this.isFileSliceNeededForPendingCompaction(fileSlice) || this.isFileSliceNeededForPendingLogCompaction(fileSlice);
    }

    private boolean isFileSliceNeededForPendingCompaction(FileSlice fileSlice) {
        CompactionOperation op = this.fgIdToPendingCompactionOperations.get(fileSlice.getFileGroupId());
        if (null != op) {
            return InstantComparison.compareTimestamps(fileSlice.getBaseInstantTime(), InstantComparison.GREATER_THAN_OR_EQUALS, op.getBaseInstantTime());
        }
        return false;
    }

    private boolean isFileSliceNeededForPendingLogCompaction(FileSlice fileSlice) {
        CompactionOperation op = this.fgIdToPendingLogCompactionOperations.get(fileSlice.getFileGroupId());
        if (null != op) {
            return InstantComparison.compareTimestamps(fileSlice.getBaseInstantTime(), InstantComparison.GREATER_THAN_OR_EQUALS, op.getBaseInstantTime());
        }
        return false;
    }

    private boolean isFileGroupInPendingMajorOrMinorCompaction(HoodieFileGroup fg) {
        return this.fgIdToPendingCompactionOperations.containsKey(fg.getFileGroupId()) || this.fgIdToPendingLogCompactionOperations.containsKey(fg.getFileGroupId());
    }

    private boolean noSubsequentReplaceCommit(String earliestCommitToRetain, String partitionPath) {
        return !this.hoodieTable.getHoodieView().getReplacedFileGroupsAfterOrOn(earliestCommitToRetain, partitionPath).findAny().isPresent();
    }

    static {
        LATEST_CLEAN_PLAN_VERSION = CLEAN_PLAN_VERSION_2 = CleanPlanV2MigrationHandler.VERSION;
    }
}

