package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
import org.apache.beam.runners.spark.structuredstreaming.translation.utils.ScalaInterop;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;
import scala.Tuple2;
import scala.collection.TraversableOnce;

/* loaded from: input_file:org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.class */
class CombinePerKeyTranslatorBatch<K, InT, AccT, OutT> extends TransformTranslator<PCollection<KV<K, InT>>, PCollection<KV<K, OutT>>, Combine.PerKey<K, InT, OutT>> {
    /* JADX INFO: Access modifiers changed from: package-private */
    public CombinePerKeyTranslatorBatch() {
        super(0.2f);
    }

    @Override // org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator
    public void translate(Combine.PerKey<K, InT, OutT> perKey, TransformTranslator<PCollection<KV<K, InT>>, PCollection<KV<K, OutT>>, Combine.PerKey<K, InT, OutT>>.Context context) {
        Dataset map;
        WindowingStrategy windowingStrategy = context.getInput().getWindowingStrategy();
        Combine.CombineFn<InT, AccT, OutT> combineFn = (Combine.CombineFn) perKey.getFn();
        KvCoder<K, V> kvCoder = (KvCoder) context.getInput().getCoder();
        KvCoder coder = context.getOutput().getCoder();
        Encoder keyEncoderOf = context.keyEncoderOf(kvCoder);
        Encoder encoderOf = context.encoderOf(kvCoder);
        Encoder<WindowedValue<T>> windowedEncoder = context.windowedEncoder((Coder) coder);
        Encoder<AccT> accumEncoder = accumEncoder(combineFn, kvCoder.getValueCoder(), context);
        boolean eligibleForGlobalGroupBy = GroupByKeyHelpers.eligibleForGlobalGroupBy(windowingStrategy, true);
        boolean eligibleForGroupByWindow = GroupByKeyHelpers.eligibleForGroupByWindow(windowingStrategy, true);
        if (eligibleForGlobalGroupBy || eligibleForGroupByWindow) {
            Aggregator value = Aggregators.value(combineFn, (v0) -> {
                return v0.getValue();
            }, accumEncoder, context.valueEncoderOf(coder));
            if (eligibleForGlobalGroupBy) {
                map = context.getDataset(context.getInput()).groupByKey(GroupByKeyHelpers.valueKey(), keyEncoderOf).mapValues(GroupByKeyHelpers.value(), encoderOf).agg(value.toColumn()).map(globalKV(), windowedEncoder);
            } else {
                Encoder tupleEncoder = context.tupleEncoder(context.windowEncoder(), keyEncoderOf);
                map = context.getDataset(context.getInput()).flatMap(GroupByKeyHelpers.explodeWindowedKey(GroupByKeyHelpers.value()), context.tupleEncoder(tupleEncoder, encoderOf)).groupByKey(ScalaInterop.fun1((v0) -> {
                    return v0._1();
                }), tupleEncoder).mapValues(ScalaInterop.fun1((v0) -> {
                    return v0._2();
                }), encoderOf).agg(value.toColumn()).map(GroupByKeyHelpers.windowedKV(), windowedEncoder);
            }
        } else {
            map = context.getDataset(context.getInput()).groupByKey(GroupByKeyHelpers.valueKey(), keyEncoderOf).agg(Aggregators.windowedValue(combineFn, GroupByKeyHelpers.valueValue(), windowingStrategy, context.windowEncoder(), accumEncoder, context.windowedEncoder(coder.getValueCoder())).toColumn()).flatMap(explodeWindows(), windowedEncoder);
        }
        context.putDataset(context.getOutput(), map);
    }

    private static <K, V> ScalaInterop.Fun1<Tuple2<K, Collection<WindowedValue<V>>>, TraversableOnce<WindowedValue<KV<K, V>>>> explodeWindows() {
        return tuple2 -> {
            return ScalaInterop.scalaIterator((Iterable) tuple2._2).map(windowedValue -> {
                return windowedValue.withValue(KV.of(tuple2._1, windowedValue.getValue()));
            });
        };
    }

    private static <K, V> ScalaInterop.Fun1<Tuple2<K, V>, WindowedValue<KV<K, V>>> globalKV() {
        return tuple2 -> {
            return WindowedValue.valueInGlobalWindow(KV.of(tuple2._1, tuple2._2));
        };
    }

    private Encoder<AccT> accumEncoder(Combine.CombineFn<InT, AccT, OutT> combineFn, Coder<InT> coder, TransformTranslator<PCollection<KV<K, InT>>, PCollection<KV<K, OutT>>, Combine.PerKey<K, InT, OutT>>.Context context) {
        try {
            return (Encoder<AccT>) context.encoderOf(combineFn.getAccumulatorCoder(context.getInput().getPipeline().getCoderRegistry(), coder));
        } catch (CannotProvideCoderException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1279701854:
                if (implMethodName.equals("lambda$explodeWindows$b611da65$1")) {
                    z = 3;
                    break;
                }
                break;
            case 2994:
                if (implMethodName.equals("_1")) {
                    z = true;
                    break;
                }
                break;
            case 2995:
                if (implMethodName.equals("_2")) {
                    z = 2;
                    break;
                }
                break;
            case 1724396661:
                if (implMethodName.equals("lambda$globalKV$c7895d8c$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop$Fun1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/values/KV") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop$Fun1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("scala/Tuple2") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0._1();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop$Fun1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("scala/Tuple2") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0._2();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop$Fun1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lscala/collection/TraversableOnce;")) {
                    return tuple2 -> {
                        return ScalaInterop.scalaIterator((Iterable) tuple2._2).map(windowedValue -> {
                            return windowedValue.withValue(KV.of(tuple2._1, windowedValue.getValue()));
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/utils/ScalaInterop$Fun1") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lorg/apache/beam/sdk/util/WindowedValue;")) {
                    return tuple22 -> {
                        return WindowedValue.valueInGlobalWindow(KV.of(tuple22._1, tuple22._2));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
