/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.common.input;

import java.util.ArrayList;
import java.util.Optional;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.talend.bigdata.common.Component;
import org.talend.bigdata.common.input.Reader;
import org.talend.bigdata.common.utils.Utils;

public interface StreamReader
extends Reader<Row>,
Component.WithSchemaDescription {
    public Optional<WatermarkingType> enableWatermarking();

    public Optional<String> watermarkColumn();

    public Optional<String> watermarkDelay();

    public Optional<String> format();

    default public Optional<StructType> schema() {
        return this.schemaFieldDescriptions().isEmpty() ? Optional.empty() : Optional.of(this.convertSchemaFieldDescriptionsToStructType());
    }

    default public DataStreamReader dataStreamReader() {
        DataStreamReader reader = this.sparkSession().readStream().options(this.options());
        this.format().ifPresent(arg_0 -> ((DataStreamReader)reader).format(arg_0));
        this.schema().ifPresent(arg_0 -> ((DataStreamReader)reader).schema(arg_0));
        return reader;
    }

    default public Dataset<Row> load() {
        return this.withWatermarking((Dataset<Row>)this.dataStreamReader().load());
    }

    default public Dataset<Row> withWatermarking(Dataset<Row> ds) {
        if (this.enableWatermarking().isPresent()) {
            if (WatermarkingType.EVENT_TIME.equals((Object)this.enableWatermarking().orElse(WatermarkingType.PROCESSING_TIME))) {
                return ds.withWatermark(this.watermarkColumn().orElse(""), this.watermarkDelay().orElse("0 second"));
            }
            return ds.withColumn("processing_time", functions.current_timestamp()).withWatermark("processing_time", this.watermarkDelay().orElse("0 second"));
        }
        return ds;
    }

    private StructType convertSchemaFieldDescriptionsToStructType() {
        ArrayList<StructField> fields = new ArrayList<StructField>();
        for (Component.SchemaFieldDescription desc : this.schemaFieldDescriptions()) {
            fields.add(DataTypes.createStructField((String)desc.name(), (DataType)Utils.parseDataType((String)desc.type()), (boolean)desc.isNullable()));
        }
        return DataTypes.createStructType(fields);
    }

    public static enum WatermarkingType {
        EVENT_TIME,
        PROCESSING_TIME;

    }
}

