/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.dataflow.spark.streaming.io;

import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.talend.bigdata.dataflow.DataFlow;
import org.talend.bigdata.dataflow.DataFlowBuilder;
import org.talend.bigdata.dataflow.io.IoSpec;
import org.talend.bigdata.dataflow.spark.common.SparkIoDataFlowBuilderFunctions;
import org.talend.bigdata.dataflow.spark.streaming.SparkStreamingDataFlow;

public class SparkIoDataFlowBuilder
implements DataFlowBuilder.DataFlowWithSpecBuilder<IoSpec> {
    @Override
    public void build(DataFlow<?> df, IoSpec mSpec) {
        SparkStreamingDataFlow sparkDf = (SparkStreamingDataFlow)df;
        if (mSpec.isInput()) {
            if (StringUtils.isNotEmpty((String)mSpec.getFieldDelimiter())) {
                String schema = mSpec.getSchema().toString();
                JavaPairDStream input = sparkDf.ssc().textFileStream(mSpec.getPath()).mapToPair((PairFunction)new SparkIoDataFlowBuilderFunctions.FlatToAvroFunction(mSpec.getFieldDelimiter(), schema));
                sparkDf.putPairDStream(mSpec.getTag(), input);
            } else {
                JavaPairDStream input = sparkDf.ssc().fileStream(mSpec.getPath(), AvroWrapper.class, NullWritable.class, AvroKeyInputFormat.class).mapToPair((PairFunction)new SparkIoDataFlowBuilderFunctions.AvroUnwrapperFunction());
                sparkDf.putPairDStream(mSpec.getTag(), input);
            }
        } else if (!StringUtils.isNotEmpty((String)mSpec.getFieldDelimiter())) {
            Job job;
            try {
                job = Job.getInstance();
            }
            catch (IOException e) {
                throw new RuntimeException("Cannot get job instance.", e);
            }
            AvroJob.setOutputKeySchema((Job)job, (Schema)mSpec.getSchema());
            sparkDf.getPairDStream(mSpec.getTag()).mapToPair(new SparkIoDataFlowBuilderFunctions.AvroStoreFunction()).saveAsNewAPIHadoopFiles(mSpec.getPath(), "", AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, job.getConfiguration());
        }
    }
}

