/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.common;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.data.HoodieAccumulator;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.function.SerializableBiFunction;
import org.apache.hudi.common.function.SerializableConsumer;
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
import org.apache.hudi.common.function.SerializablePairFunction;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.data.HoodieSparkLongAccumulator;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;

@ThreadSafe
public class HoodieSparkEngineContext
extends HoodieEngineContext {
    private final JavaSparkContext javaSparkContext;
    private final SQLContext sqlContext;
    private final Map<HoodieData.HoodieDataCacheKey, List<Integer>> cachedRddIds = new HashMap<HoodieData.HoodieDataCacheKey, List<Integer>>();

    public HoodieSparkEngineContext(JavaSparkContext jsc) {
        this(jsc, SQLContext.getOrCreate((SparkContext)jsc.sc()));
    }

    public HoodieSparkEngineContext(JavaSparkContext jsc, SQLContext sqlContext) {
        super(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration()), new SparkTaskContextSupplier());
        this.javaSparkContext = jsc;
        this.sqlContext = sqlContext;
    }

    public JavaSparkContext getJavaSparkContext() {
        return this.javaSparkContext;
    }

    public JavaSparkContext jsc() {
        return this.javaSparkContext;
    }

    public SQLContext getSqlContext() {
        return this.sqlContext;
    }

    public static JavaSparkContext getSparkContext(HoodieEngineContext context) {
        return ((HoodieSparkEngineContext)context).getJavaSparkContext();
    }

    @Override
    public HoodieAccumulator newAccumulator() {
        HoodieSparkLongAccumulator accumulator = HoodieSparkLongAccumulator.create();
        this.javaSparkContext.sc().register(accumulator.getAccumulator());
        return accumulator;
    }

    @Override
    public <T> HoodieData<T> emptyHoodieData() {
        return HoodieJavaRDD.of(this.javaSparkContext.emptyRDD());
    }

    @Override
    public boolean supportsFileGroupReader() {
        return true;
    }

    @Override
    public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
        return HoodieJavaRDD.of(this.javaSparkContext.parallelize(data, parallelism));
    }

    @Override
    public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
        return this.javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
    }

    @Override
    public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, SerializablePairFunction<I, K, V> mapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
        return this.javaSparkContext.parallelize(data, parallelism).mapToPair((PairFunction & Serializable)input -> {
            Pair pair = mapToPairFunc.call(input);
            return new Tuple2(pair.getLeft(), pair.getRight());
        }).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
    }

    @Override
    public <I, K, V> Stream<ImmutablePair<K, V>> mapPartitionsToPairAndReduceByKey(Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> flatMapToPairFunc, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
        return this.javaSparkContext.parallelize(data.collect(Collectors.toList()), parallelism).mapPartitionsToPair((PairFlatMapFunction & Serializable)iterator2 -> flatMapToPairFunc.call((Iterator)iterator2).collect(Collectors.toList()).stream().map(e -> new Tuple2(e.getKey(), e.getValue())).iterator()).reduceByKey(reduceFunc::apply).map((Function & Serializable)e -> new ImmutablePair<Object, Object>(e._1, e._2)).collect().stream();
    }

    @Override
    public <I, K, V> List<V> reduceByKey(List<Pair<K, V>> data, SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
        return this.javaSparkContext.parallelize(data, parallelism).mapToPair((PairFunction & Serializable)pair -> new Tuple2(pair.getLeft(), pair.getRight())).reduceByKey(reduceFunc::apply).map(Tuple2::_2).collect();
    }

    @Override
    public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism) {
        return this.javaSparkContext.parallelize(data, parallelism).flatMap((FlatMapFunction & Serializable)x -> ((Stream)func.apply(x)).iterator()).collect();
    }

    @Override
    public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism) {
        this.javaSparkContext.parallelize(data, parallelism).foreach(consumer::accept);
    }

    @Override
    public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
        if (Objects.nonNull(parallelism)) {
            return this.javaSparkContext.parallelize(data, parallelism.intValue()).mapToPair((PairFunction & Serializable)input -> {
                Pair pair = func.call(input);
                return new Tuple2(pair.getLeft(), pair.getRight());
            }).collectAsMap();
        }
        return this.javaSparkContext.parallelize(data).mapToPair((PairFunction & Serializable)input -> {
            Pair pair = func.call(input);
            return new Tuple2(pair.getLeft(), pair.getRight());
        }).collectAsMap();
    }

    @Override
    public void setProperty(EngineProperty key, String value) {
        if (!(key.equals((Object)EngineProperty.COMPACTION_POOL_NAME) || key.equals((Object)EngineProperty.CLUSTERING_POOL_NAME) || key.equals((Object)EngineProperty.DELTASYNC_POOL_NAME))) {
            throw new HoodieException("Unknown engine property :" + (Object)((Object)key));
        }
        this.javaSparkContext.setLocalProperty("spark.scheduler.pool", value);
    }

    @Override
    public Option<String> getProperty(EngineProperty key) {
        if (key == EngineProperty.EMBEDDED_SERVER_HOST) {
            return Option.ofNullable(this.javaSparkContext.getConf().get("spark.driver.host", null));
        }
        throw new HoodieException("Unknown engine property :" + (Object)((Object)key));
    }

    @Override
    public void setJobStatus(String activeModule, String activityDescription) {
        this.javaSparkContext.setJobGroup(activeModule, activityDescription);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void putCachedDataIds(HoodieData.HoodieDataCacheKey cacheKey, int ... ids) {
        Map<HoodieData.HoodieDataCacheKey, List<Integer>> map = this.cachedRddIds;
        synchronized (map) {
            this.cachedRddIds.putIfAbsent(cacheKey, new ArrayList());
            for (int id : ids) {
                this.cachedRddIds.get(cacheKey).add(id);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Integer> getCachedDataIds(HoodieData.HoodieDataCacheKey cacheKey) {
        Map<HoodieData.HoodieDataCacheKey, List<Integer>> map = this.cachedRddIds;
        synchronized (map) {
            return this.cachedRddIds.getOrDefault(cacheKey, Collections.emptyList());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<Integer> removeCachedDataIds(HoodieData.HoodieDataCacheKey cacheKey) {
        Map<HoodieData.HoodieDataCacheKey, List<Integer>> map = this.cachedRddIds;
        synchronized (map) {
            List<Integer> removed = this.cachedRddIds.remove(cacheKey);
            return removed == null ? Collections.emptyList() : removed;
        }
    }

    @Override
    public void cancelJob(String groupId) {
        this.javaSparkContext.cancelJobGroup(groupId);
    }

    @Override
    public void cancelAllJobs() {
        this.javaSparkContext.cancelAllJobs();
    }

    @Override
    public <I, O> O aggregate(HoodieData<I> data, O zeroValue, Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp) {
        Function2 & Serializable seqOpFunc = seqOp::apply;
        Function2 & Serializable combOpFunc = combOp::apply;
        return (O)HoodieJavaRDD.getJavaRDD(data).aggregate(zeroValue, (Function2)seqOpFunc, (Function2)combOpFunc);
    }

    public SparkConf getConf() {
        return this.javaSparkContext.getConf();
    }

    public Configuration hadoopConfiguration() {
        return this.javaSparkContext.hadoopConfiguration();
    }

    public <T> JavaRDD<T> emptyRDD() {
        return this.javaSparkContext.emptyRDD();
    }
}

