/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.parquet;

import java.io.Serializable;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.immutables.value.Value;
import org.talend.bigdata.structuredstreaming.common.output.FileWriter;

@Value.Immutable
public abstract class TFileOutputParquet
implements FileWriter {
    @Value.Default
    boolean sortColumnsAlphabetically() {
        return false;
    }

    @Value.Default
    String[] partitionColumns() {
        return new String[0];
    }

    @Value.Default
    boolean overrideDatesTZwithUTC() {
        return false;
    }

    @Override
    public StreamingQuery start() throws TimeoutException {
        Dataset<Row> df = this.input();
        if (this.sortColumnsAlphabetically()) {
            df = this.sortedColumnDataframe(df);
        }
        if (this.overrideDatesTZwithUTC()) {
            df = this.forceUTC(df);
        }
        DataStreamWriter writer = this.writer(df);
        if (this.partitionColumns().length > 0) {
            writer = writer.partitionBy(this.partitionColumns());
        }
        return writer.start(this.resolveOutputPath());
    }

    private Dataset<Row> sortedColumnDataframe(Dataset<Row> df) {
        Object[] columns = df.columns();
        Arrays.sort(columns);
        return df.select((Column[])Arrays.stream(columns).map(functions::col).toArray(Column[]::new));
    }

    private Dataset<Row> forceUTC(Dataset<Row> df) {
        df.sparkSession().udf().register("forceUTC", (UDF1 & Serializable)localTs -> {
            ZonedDateTime zdt = localTs.toLocalDateTime().atZone(ZoneId.of("UTC"));
            Instant instant = zdt.toInstant();
            return Timestamp.from(instant);
        }, DataTypes.TimestampType);
        return df.select((Column[])Arrays.stream(df.schema().fields()).map(f -> {
            if (f.dataType().equals(DataTypes.TimestampType)) {
                return functions.callUDF((String)"forceUTC", (Column[])new Column[]{functions.col((String)f.name())}).alias(f.name());
            }
            return functions.col((String)f.name());
        }).toArray(Column[]::new));
    }
}

