/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.execution.bulkinsert;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hudi.common.model.ConsistentHashingNode;
import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.execution.bulkinsert.RowRecordKeyExtractor;
import org.apache.hudi.index.bucket.ConsistentBucketIdentifier;
import org.apache.hudi.index.bucket.ConsistentBucketIndexUtils;
import org.apache.hudi.index.bucket.HoodieSparkConsistentBucketIndex;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.ConsistentHashingBucketInsertPartitioner;
import org.apache.hudi.table.HoodieTable;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple2;

public class ConsistentBucketIndexBulkInsertPartitionerWithRows
implements BulkInsertPartitioner<Dataset<Row>>,
ConsistentHashingBucketInsertPartitioner {
    private final HoodieTable table;
    private final String indexKeyFields;
    private final String[] sortColumnNames;
    private final List<String> fileIdPfxList = new ArrayList<String>();
    private final Map<String, List<ConsistentHashingNode>> hashingChildrenNodes;
    private Map<String, ConsistentBucketIdentifier> partitionToIdentifier;
    private final Option<BuiltinKeyGenerator> keyGeneratorOpt;
    private Map<String, Map<String, Integer>> partitionToFileIdPfxIdxMap;
    private final RowRecordKeyExtractor extractor;
    private final boolean populateMetaFields;

    public ConsistentBucketIndexBulkInsertPartitionerWithRows(HoodieTable table, Map<String, String> strategyParams, boolean populateMetaFields) {
        this.indexKeyFields = table.getConfig().getBucketIndexHashField();
        this.table = table;
        this.hashingChildrenNodes = new HashMap<String, List<ConsistentHashingNode>>();
        this.populateMetaFields = populateMetaFields;
        this.keyGeneratorOpt = !populateMetaFields ? HoodieSparkKeyGeneratorFactory.getKeyGenerator(table.getConfig().getProps()) : Option.empty();
        String sortString = strategyParams.getOrDefault(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key(), "");
        this.sortColumnNames = !StringUtils.isNullOrEmpty(sortString) ? sortString.split(",") : null;
        this.extractor = RowRecordKeyExtractor.getRowRecordKeyExtractor(populateMetaFields, this.keyGeneratorOpt);
        ValidationUtils.checkArgument(table.getMetaClient().getTableType().equals((Object)HoodieTableType.MERGE_ON_READ), "Consistent hash bucket index doesn't support CoW table");
    }

    private ConsistentBucketIdentifier getBucketIdentifier(String partition) {
        HoodieSparkConsistentBucketIndex index = (HoodieSparkConsistentBucketIndex)this.table.getIndex();
        HoodieConsistentHashingMetadata metadata = ConsistentBucketIndexUtils.loadOrCreateMetadata(this.table, partition, index.getNumBuckets());
        if (this.hashingChildrenNodes.containsKey(partition)) {
            metadata.setChildrenNodes(this.hashingChildrenNodes.get(partition));
        }
        return new ConsistentBucketIdentifier(metadata);
    }

    @Override
    public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputPartitions) {
        JavaRDD rowJavaRDD = rows.toJavaRDD();
        this.prepareRepartition((JavaRDD<Row>)rowJavaRDD);
        Dataset partitionedRows = rows.sparkSession().createDataFrame(rowJavaRDD.mapToPair((PairFunction & Serializable)row -> new Tuple2((Object)this.getBucketId((Row)row), row)).partitionBy(new Partitioner(){

            public int getPartition(Object key) {
                return (Integer)key;
            }

            public int numPartitions() {
                return ConsistentBucketIndexBulkInsertPartitionerWithRows.this.fileIdPfxList.size();
            }
        }).values(), rows.schema());
        if (this.sortColumnNames != null && this.sortColumnNames.length > 0) {
            partitionedRows = partitionedRows.sortWithinPartitions((Column[])Arrays.stream(this.sortColumnNames).map(Column::new).toArray(Column[]::new));
        } else if (this.table.requireSortedRecords() || this.table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE) {
            if (this.populateMetaFields) {
                partitionedRows = partitionedRows.sortWithinPartitions(HoodieRecord.RECORD_KEY_METADATA_FIELD, new String[0]);
            } else {
                throw new HoodieException("Sorting by record key for consistent hashing bucket index requires meta-fields to be enabled");
            }
        }
        return partitionedRows;
    }

    private void prepareRepartition(JavaRDD<Row> rows) {
        this.partitionToIdentifier = this.initializeBucketIdentifier(rows);
        this.partitionToFileIdPfxIdxMap = ConsistentBucketIndexUtils.generatePartitionToFileIdPfxIdxMap(this.partitionToIdentifier);
        this.partitionToIdentifier.values().forEach(identifier -> this.fileIdPfxList.addAll(identifier.getNodes().stream().map(ConsistentHashingNode::getFileIdPrefix).collect(Collectors.toList())));
    }

    private Map<String, ConsistentBucketIdentifier> initializeBucketIdentifier(JavaRDD<Row> rows) {
        return rows.map(this.extractor::getPartitionPath).distinct().collect().stream().collect(Collectors.toMap(p -> p, this::getBucketIdentifier));
    }

    @Override
    public void addHashingChildrenNodes(String partition, List<ConsistentHashingNode> nodes) {
        ValidationUtils.checkState(nodes.stream().noneMatch(n -> n.getTag() == ConsistentHashingNode.NodeTag.NORMAL), "children nodes should not be tagged as NORMAL");
        this.hashingChildrenNodes.put(partition, nodes);
    }

    @Override
    public boolean arePartitionRecordsSorted() {
        return this.sortColumnNames != null && this.sortColumnNames.length > 0 || this.table.requireSortedRecords() || this.table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE;
    }

    private int getBucketId(Row row) {
        String recordKey = this.extractor.getRecordKey(row);
        String partitionPath = this.extractor.getPartitionPath(row);
        ConsistentHashingNode node = this.partitionToIdentifier.get(partitionPath).getBucket(recordKey, this.indexKeyFields);
        return this.partitionToFileIdPfxIdxMap.get(partitionPath).get(node.getFileIdPrefix());
    }
}

