/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.dataflow.spark.streaming.hmap;

import java.util.LinkedList;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.talend.bigdata.dataflow.DataFlowPipelineBuilder;
import org.talend.bigdata.dataflow.hmap.HMapSpec;
import org.talend.bigdata.dataflow.spark.common.hmap.MapPairToPairWithCompositeKeyFunction;
import org.talend.bigdata.dataflow.spark.common.hmap.MapPairToPairWithNullKeyFunction;
import org.talend.bigdata.dataflow.spark.common.hmap.MapPairToPairWithSingleKeyFunction;
import org.talend.bigdata.dataflow.spark.common.hmap.Multiplexor;
import org.talend.bigdata.dataflow.spark.streaming.SparkStreamingDataFlow;

public class SparkHMapJoin
extends org.talend.bigdata.dataflow.spark.batch.hmap.SparkHMapJoin<SparkStreamingDataFlow> {
    private static <INPUT, OUTPUT> JavaPairDStream<INPUT, OUTPUT> buildUnionBatchAndStreaming(JavaStreamingContext ssc, JavaPairRDD<INPUT, OUTPUT> dataRdd, JavaPairDStream<INPUT, OUTPUT> dataStream) {
        if (dataRdd != null) {
            if (dataStream == null) {
                JavaPairDStream emptyData = JavaPairDStream.fromJavaDStream((JavaDStream)ssc.queueStream(new LinkedList(), true));
                return emptyData.transformToPair(new UnionFunction<INPUT, OUTPUT>(dataRdd));
            }
            return dataStream.transformToPair(new UnionFunction<INPUT, OUTPUT>(dataRdd));
        }
        return dataStream;
    }

    @Override
    public void build(SparkStreamingDataFlow sdf, HMapSpec spec, DataFlowPipelineBuilder.Pipeline pa, Iterable<DataFlowPipelineBuilder.Pipeline> incoming) {
        int mainDataCount = 0;
        JavaPairDStream mainRdd = null;
        JavaPairDStream mainRddTagged = null;
        JavaPairDStream mainDStream = null;
        JavaPairDStream mainDStreamTagged = null;
        int lookupDataCount = 0;
        JavaPairDStream lookupRdd = null;
        JavaPairDStream lookupRddTagged = null;
        JavaPairDStream lookupDStream = null;
        JavaPairDStream lookupDStreamTagged = null;
        for (DataFlowPipelineBuilder.Pipeline incomingPipeline : incoming) {
            JavaPairDStream tagged;
            JavaPairDStream in;
            String inTag = incomingPipeline.getTag();
            HMapSpec.InputDef inDef = spec.getInput(inTag);
            if (sdf.getIntermediatePairDStream(inDef.getTag()) != null) {
                in = sdf.getIntermediatePairDStream(inTag);
                if (!sdf.getParameter(inTag, "retain_key", false).booleanValue()) {
                    in = spec.getJoinKeySize() == 1 ? in.mapToPair(new MapPairToPairWithSingleKeyFunction(inDef.getOrder(), spec)) : (spec.getJoinKeySize() == 0 ? in.mapToPair(new MapPairToPairWithNullKeyFunction()) : in.mapToPair(new MapPairToPairWithCompositeKeyFunction(inDef.getOrder(), spec)));
                }
                if (inDef.isMain()) {
                    ++mainDataCount;
                    mainDStream = in;
                    tagged = in.mapValues(new Multiplexor((byte)inDef.getOrder()));
                    mainDStreamTagged = mainDStreamTagged == null ? tagged : mainDStreamTagged.union(tagged);
                    continue;
                }
                ++lookupDataCount;
                lookupDStream = in;
                tagged = in.mapValues(new Multiplexor((byte)inDef.getOrder()));
                lookupDStreamTagged = lookupDStreamTagged == null ? tagged : lookupDStreamTagged.union(tagged);
                continue;
            }
            in = sdf.getIntermediatePairRDD(inTag);
            if (!sdf.getParameter(inTag, "retain_key", false).booleanValue()) {
                in = spec.getJoinKeySize() == 1 ? in.mapToPair(new MapPairToPairWithSingleKeyFunction(inDef.getOrder(), spec)) : (spec.getJoinKeySize() == 0 ? in.mapToPair(new MapPairToPairWithNullKeyFunction()) : in.mapToPair(new MapPairToPairWithCompositeKeyFunction(inDef.getOrder(), spec)));
            }
            if (inDef.isMain()) {
                ++mainDataCount;
                mainRdd = in;
                tagged = in.mapValues(new Multiplexor((byte)inDef.getOrder()));
                mainRddTagged = mainRddTagged == null ? tagged : mainRddTagged.union((JavaPairRDD)tagged);
                continue;
            }
            ++lookupDataCount;
            lookupRdd = in;
            tagged = in.mapValues(new Multiplexor((byte)inDef.getOrder()));
            lookupRddTagged = lookupRddTagged == null ? tagged : lookupRddTagged.union((JavaPairRDD)tagged);
        }
        if (mainDStream == null && lookupDStream != null) {
            throw new RuntimeException("Cannot join streaming data to main batch input.");
        }
        if (mainDStream == null && lookupDStream == null) {
            sdf.putIntermediatePairRDD(pa.getTag(), SparkHMapJoin.buildBatchJoin(spec, spec.getJoinKeySize(), mainDataCount != 1, mainDataCount == 1 ? mainRdd : mainRddTagged, lookupDataCount != 1, lookupDataCount == 1 ? lookupRdd : lookupRddTagged, sdf.getParameter(pa.getTag(), "storage_level", null)));
        } else {
            sdf.putIntermediatePairDStream(pa.getTag(), SparkHMapJoin.buildStreamingJoin(spec, spec.getJoinKeySize(), mainDataCount != 1, mainDataCount == 1 ? SparkHMapJoin.buildUnionBatchAndStreaming(sdf.ssc(), mainRdd, mainDStream) : SparkHMapJoin.buildUnionBatchAndStreaming(sdf.ssc(), mainRddTagged, mainDStreamTagged), lookupDataCount != 1, lookupDataCount == 1 ? SparkHMapJoin.buildUnionBatchAndStreaming(sdf.ssc(), lookupRdd, lookupDStream) : SparkHMapJoin.buildUnionBatchAndStreaming(sdf.ssc(), lookupRddTagged, lookupDStreamTagged), sdf.getParameter(pa.getTag(), "storage_level", null)));
        }
    }

    public static class UnionFunction<K, V>
    implements Function<JavaPairRDD<K, V>, JavaPairRDD<K, V>> {
        private static final long serialVersionUID = 1L;
        private final JavaPairRDD<K, V> mStatic;

        UnionFunction(JavaPairRDD<K, V> staticData) {
            this.mStatic = staticData;
        }

        public JavaPairRDD<K, V> call(JavaPairRDD<K, V> in) {
            return this.mStatic.union(in);
        }
    }
}

