/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.hadoop;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TablePathUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader;
import org.apache.hudi.hadoop.realtime.RealtimeSplit;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.Types;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SchemaEvolutionContext {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaEvolutionContext.class);
    private static final String HIVE_TMP_READ_COLUMN_NAMES_CONF_STR = "hive.tmp.io.file.readcolumn.ids";
    private static final String HIVE_TMP_COLUMNS = "hive.tmp.columns";
    private static final String HIVE_EVOLUTION_ENABLE = "hudi.hive.schema.evolution";
    private final InputSplit split;
    private final JobConf job;
    private final HoodieTableMetaClient metaClient;
    public Option<InternalSchema> internalSchemaOption;

    public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException {
        this(split, job, Option.empty());
    }

    public SchemaEvolutionContext(InputSplit split, JobConf job, Option<HoodieTableMetaClient> metaClientOption) throws IOException {
        this.split = split;
        this.job = job;
        if (!job.getBoolean(HIVE_EVOLUTION_ENABLE, true)) {
            LOG.info("Schema evolution is disabled for split: {}", (Object)split);
            this.internalSchemaOption = Option.empty();
            this.metaClient = null;
            return;
        }
        this.metaClient = metaClientOption.isPresent() ? metaClientOption.get() : this.setUpHoodieTableMetaClient();
        this.internalSchemaOption = this.getInternalSchemaFromCache();
    }

    public Option<InternalSchema> getInternalSchemaFromCache() throws IOException {
        Option<InternalSchema> internalSchemaOpt = this.getCachedData("hudi.hive.internal.schema.cache.key.prefix", SerDeHelper::fromJson);
        if (internalSchemaOpt == null) {
            return new TableSchemaResolver(this.metaClient).getTableInternalSchemaFromCommitMetadata();
        }
        return internalSchemaOpt;
    }

    public Schema getAvroSchemaFromCache() throws Exception {
        Option avroSchemaOpt = this.getCachedData("hudi.hive.schema.cache.key.prefix", json -> Option.ofNullable(new Schema.Parser().parse(json)));
        if (avroSchemaOpt == null) {
            return new TableSchemaResolver(this.metaClient).getTableAvroSchema();
        }
        return (Schema)avroSchemaOpt.orElseThrow(() -> new HoodieValidationException("The avro schema cache should always be set up together with the internal schema cache"));
    }

    @Nullable
    private <T> Option<T> getCachedData(String keyPrefix, Function<String, Option<T>> parser) throws IOException {
        Option<StoragePath> tablePath = this.getTablePath(this.job, this.split);
        if (!tablePath.isPresent()) {
            return Option.empty();
        }
        String cacheKey = keyPrefix + "." + tablePath.get().toUri();
        String cachedJson = this.job.get(cacheKey);
        if (cachedJson == null) {
            return null;
        }
        if (cachedJson.isEmpty()) {
            return Option.empty();
        }
        try {
            return parser.apply(cachedJson);
        }
        catch (Exception e) {
            LOG.warn("Failed to parse data from cache with key: {}", (Object)cacheKey, (Object)e);
            return Option.empty();
        }
    }

    private Option<StoragePath> getTablePath(JobConf job, InputSplit split) throws IOException {
        if (split instanceof FileSplit) {
            Path path = ((FileSplit)split).getPath();
            FileSystem fs = path.getFileSystem((Configuration)job);
            HoodieHadoopStorage storage = new HoodieHadoopStorage(fs);
            return TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
        }
        return Option.empty();
    }

    private HoodieTableMetaClient setUpHoodieTableMetaClient() {
        try {
            Path inputPath = ((FileSplit)this.split).getPath();
            FileSystem fs = inputPath.getFileSystem((Configuration)this.job);
            HoodieHadoopStorage storage = new HoodieHadoopStorage(fs);
            Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(inputPath));
            return HoodieTableMetaClient.builder().setBasePath(tablePath.get().toString()).setConf(HadoopFSUtils.getStorageConfWithCopy((Configuration)this.job)).build();
        }
        catch (Exception e) {
            LOG.warn(String.format("Not a valid hoodie table, table path: %s", ((FileSplit)this.split).getPath()), (Throwable)e);
            return null;
        }
    }

    public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realtimeRecordReader) throws Exception {
        if (!(this.split instanceof RealtimeSplit)) {
            LOG.warn("expect realtime split for mor table, but find other type split {}", (Object)this.split);
            return;
        }
        if (this.internalSchemaOption.isPresent()) {
            Schema tableAvroSchema = this.getAvroSchemaFromCache();
            List<String> requiredColumns = SchemaEvolutionContext.getRequireColumn(this.job);
            InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(this.internalSchemaOption.get(), requiredColumns);
            String partitionFields = this.job.get("partition_columns", "");
            ArrayList<String> partitioningFields = !partitionFields.isEmpty() ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) : new ArrayList<String>();
            Schema writerSchema = AvroInternalSchemaConverter.convert(this.internalSchemaOption.get(), tableAvroSchema.getName());
            writerSchema = HoodieRealtimeRecordReaderUtils.addPartitionFields(writerSchema, partitioningFields);
            Map<String, Schema.Field> schemaFieldsMap = HoodieRealtimeRecordReaderUtils.getNameToFieldMap(writerSchema);
            Schema hiveSchema = realtimeRecordReader.constructHiveOrderedSchema(writerSchema, schemaFieldsMap, this.job.get(HIVE_TMP_COLUMNS));
            Schema readerSchema = AvroInternalSchemaConverter.convert(prunedInternalSchema, tableAvroSchema.getName());
            realtimeRecordReader.setWriterSchema(writerSchema);
            realtimeRecordReader.setReaderSchema(readerSchema);
            realtimeRecordReader.setHiveSchema(hiveSchema);
            this.internalSchemaOption = Option.of(prunedInternalSchema);
            RealtimeSplit realtimeSplit = (RealtimeSplit)this.split;
            LOG.info("About to read compacted logs {} for base split {}, projecting cols {}", new Object[]{realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns});
        }
    }

    public void doEvolutionForParquetFormat() {
        if (this.internalSchemaOption.isPresent()) {
            boolean disableSchemaEvolution;
            this.job.setBoolean(HIVE_EVOLUTION_ENABLE, true);
            Path finalPath = ((FileSplit)this.split).getPath();
            List<String> requiredColumns = SchemaEvolutionContext.getRequireColumn(this.job);
            boolean bl = disableSchemaEvolution = requiredColumns.isEmpty() || requiredColumns.size() == 1 && requiredColumns.get(0).isEmpty();
            if (!disableSchemaEvolution) {
                InternalSchema prunedSchema;
                InternalSchema querySchema = prunedSchema = InternalSchemaUtils.pruneInternalSchema(this.internalSchemaOption.get(), requiredColumns);
                long commitTime = Long.parseLong(FSUtils.getCommitTime(finalPath.getName()));
                InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, this.metaClient);
                InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema();
                List<Types.Field> fields = mergedInternalSchema.columns();
                this.setColumnNameList(this.job, fields);
                this.setColumnTypeList(this.job, fields);
                this.pushDownFilter(this.job, querySchema, fileSchema);
            }
        }
    }

    public void setColumnTypeList(JobConf job, List<Types.Field> fields) {
        int i;
        ArrayList fullTypeInfos = TypeInfoUtils.getTypeInfosFromTypeString((String)job.get("columns.types"));
        List tmpColIdList = Arrays.stream(job.get("hive.io.file.readcolumn.ids").split(",")).map(Integer::parseInt).collect(Collectors.toList());
        if (tmpColIdList.size() != fields.size()) {
            throw new HoodieException(String.format("The size of hive.io.file.readcolumn.ids: %s is not equal to projection columns: %s", job.get("hive.io.file.readcolumn.ids"), fields.stream().map(Types.Field::name).collect(Collectors.joining(","))));
        }
        ArrayList<TypeInfo> fieldTypes = new ArrayList<TypeInfo>();
        for (i = 0; i < tmpColIdList.size(); ++i) {
            Types.Field field = fields.get(i);
            TypeInfo typeInfo = (TypeInfo)TypeInfoUtils.getTypeInfosFromTypeString((String)((TypeInfo)fullTypeInfos.get((Integer)tmpColIdList.get(i))).getQualifiedName()).get(0);
            TypeInfo fieldType = this.constructHiveSchemaFromType(field.type(), typeInfo);
            fieldTypes.add(fieldType);
        }
        for (i = 0; i < tmpColIdList.size(); ++i) {
            TypeInfo typeInfo = (TypeInfo)fieldTypes.get(i);
            if (typeInfo instanceof PrimitiveTypeInfo) continue;
            int index = (Integer)tmpColIdList.get(i);
            fullTypeInfos.remove(index);
            fullTypeInfos.add(index, typeInfo);
        }
        List fullColTypeList = TypeInfoUtils.getTypeStringsFromTypeInfo((List)fullTypeInfos);
        String fullColTypeListString = String.join((CharSequence)",", fullColTypeList);
        job.set("columns.types", fullColTypeListString);
    }

    private TypeInfo constructHiveSchemaFromType(Type type, TypeInfo typeInfo) {
        switch (type.typeId()) {
            case RECORD: {
                Types.RecordType record = (Types.RecordType)type;
                List<Types.Field> fields = record.fields();
                ArrayList<TypeInfo> fieldTypes = new ArrayList<TypeInfo>();
                ArrayList<String> fieldNames = new ArrayList<String>();
                for (int index = 0; index < fields.size(); ++index) {
                    StructTypeInfo structTypeInfo = (StructTypeInfo)typeInfo;
                    TypeInfo subTypeInfo = this.getSchemaSubTypeInfo((TypeInfo)structTypeInfo.getAllStructFieldTypeInfos().get(index), fields.get(index).type());
                    fieldTypes.add(subTypeInfo);
                    String name = fields.get(index).name();
                    fieldNames.add(name);
                }
                StructTypeInfo structTypeInfo = new StructTypeInfo();
                structTypeInfo.setAllStructFieldNames(fieldNames);
                structTypeInfo.setAllStructFieldTypeInfos(fieldTypes);
                return structTypeInfo;
            }
            case ARRAY: {
                ListTypeInfo listTypeInfo = (ListTypeInfo)typeInfo;
                Types.ArrayType array = (Types.ArrayType)type;
                TypeInfo subTypeInfo = this.getSchemaSubTypeInfo(listTypeInfo.getListElementTypeInfo(), array.elementType());
                listTypeInfo.setListElementTypeInfo(subTypeInfo);
                return listTypeInfo;
            }
            case MAP: {
                Types.MapType map = (Types.MapType)type;
                MapTypeInfo mapTypeInfo = (MapTypeInfo)typeInfo;
                TypeInfo keyType = this.getSchemaSubTypeInfo(mapTypeInfo.getMapKeyTypeInfo(), map.keyType());
                TypeInfo valueType = this.getSchemaSubTypeInfo(mapTypeInfo.getMapValueTypeInfo(), map.valueType());
                MapTypeInfo mapType = new MapTypeInfo();
                mapType.setMapKeyTypeInfo(keyType);
                mapType.setMapValueTypeInfo(valueType);
                return mapType;
            }
            case BOOLEAN: 
            case INT: 
            case LONG: 
            case FLOAT: 
            case DOUBLE: 
            case DATE: 
            case TIMESTAMP: 
            case STRING: 
            case UUID: 
            case FIXED: 
            case BINARY: 
            case DECIMAL: {
                return typeInfo;
            }
            case TIME: {
                throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", type));
            }
        }
        LOG.error("cannot convert unknown type: {} to Hive", (Object)type);
        throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", type));
    }

    private TypeInfo getSchemaSubTypeInfo(TypeInfo hoodieTypeInfo, Type hiveType) {
        TypeInfo subTypeInfo = (TypeInfo)TypeInfoUtils.getTypeInfosFromTypeString((String)hoodieTypeInfo.getQualifiedName()).get(0);
        TypeInfo typeInfo = subTypeInfo instanceof PrimitiveTypeInfo ? subTypeInfo : this.constructHiveSchemaFromType(hiveType, subTypeInfo);
        return typeInfo;
    }

    private void pushDownFilter(JobConf job, InternalSchema querySchema, InternalSchema fileSchema) {
        String filterExprSerialized = job.get("hive.io.filter.expr.serialized");
        if (filterExprSerialized != null) {
            ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression((String)filterExprSerialized);
            LinkedList<ExprNodeGenericFuncDesc> exprNodes = new LinkedList<ExprNodeGenericFuncDesc>();
            exprNodes.add(filterExpr);
            while (!exprNodes.isEmpty()) {
                int size = exprNodes.size();
                for (int i = 0; i < size; ++i) {
                    List children;
                    ExprNodeDesc expr = (ExprNodeDesc)exprNodes.poll();
                    if (expr instanceof ExprNodeColumnDesc) {
                        String oldColumn = ((ExprNodeColumnDesc)expr).getColumn();
                        String newColumn = InternalSchemaUtils.reBuildFilterName(oldColumn, fileSchema, querySchema);
                        ((ExprNodeColumnDesc)expr).setColumn(newColumn);
                    }
                    if ((children = expr.getChildren()) == null) continue;
                    exprNodes.addAll(children);
                }
            }
            String filterText = filterExpr.getExprString();
            String serializedFilterExpr = SerializationUtilities.serializeExpression((ExprNodeGenericFuncDesc)filterExpr);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Pushdown initiated with filterText = {}, filterExpr = {}, serializedFilterExpr = {}", new Object[]{filterText, filterExpr, serializedFilterExpr});
            }
            job.set("hive.io.filter.text", filterText);
            job.set("hive.io.filter.expr.serialized", serializedFilterExpr);
        }
    }

    private void setColumnNameList(JobConf job, List<Types.Field> fields) {
        if (fields == null) {
            return;
        }
        List<String> tmpColIdList = Arrays.asList(job.get("hive.io.file.readcolumn.ids").split(","));
        if (fields.size() != tmpColIdList.size()) {
            return;
        }
        StringBuilder readColumnNames = new StringBuilder();
        List<String> tmpColNameList = Arrays.asList(job.get("columns").split(","));
        ArrayList<String> fullColNamelist = new ArrayList<String>(tmpColNameList);
        for (int index = 0; index < fields.size(); ++index) {
            String colName = fields.get(index).name();
            if (readColumnNames.length() > 0) {
                readColumnNames.append(',');
            }
            readColumnNames.append(colName);
            int id = Integer.parseInt(tmpColIdList.get(index));
            if (colName.equals(fullColNamelist.get(id))) continue;
            fullColNamelist.remove(id);
            fullColNamelist.add(id, colName);
        }
        String readColumnNamesString = readColumnNames.toString();
        job.set("hive.io.file.readcolumn.names", readColumnNamesString);
        String fullColNamelistString = String.join((CharSequence)",", fullColNamelist);
        job.set("columns", fullColNamelistString);
    }

    public static List<String> getRequireColumn(JobConf jobConf) {
        String hoodieFullColumnString;
        String originColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR);
        if (StringUtils.isNullOrEmpty(originColumnString)) {
            jobConf.set(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR, jobConf.get("hive.io.file.readcolumn.names"));
        }
        if (StringUtils.isNullOrEmpty(hoodieFullColumnString = jobConf.get(HIVE_TMP_COLUMNS))) {
            jobConf.set(HIVE_TMP_COLUMNS, jobConf.get("columns"));
        }
        String tableColumnString = jobConf.get(HIVE_TMP_READ_COLUMN_NAMES_CONF_STR);
        List<String> tableColumns = Arrays.asList(tableColumnString.split(","));
        return new ArrayList<String>(tableColumns);
    }
}

