/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.hive;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.talend.bigdata.common.Component;
import org.talend.bigdata.common.ImmutableSchemaFieldDescription;
import org.talend.bigdata.common.TestData;
import org.talend.bigdata.structuredstreaming.hive.ImmutableTHiveOutput;
import org.talend.bigdata.structuredstreaming.iceberg.ImmutableTIcebergInput;
import org.talend.bigdata.structuredstreaming.iceberg.ImmutableTIcebergOutput;

public class THiveOutputTest {
    private static SparkSession ss;
    private static final String CHECKPOINT_PATH = "/tmp/cp";

    @BeforeAll
    public static void setUp() {
        SparkConf conf = new SparkConf(true);
        conf.set("spark.sql.catalogImplementation", "hive");
        conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
        conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog");
        conf.set("spark.sql.catalog.local.type", "hadoop");
        conf.set("spark.sql.catalog.local.warehouse", "/tmp/warehouse");
        conf.set("spark.sql.catalog.local.default.write.metadata-flush-after-create", "true");
        ss = TestData.SPARK.sparkSessionWithHive((SparkConf)conf);
    }

    @AfterAll
    public static void tearDown() {
        ss.stop();
    }

    @Test
    public void testExternalTable() throws TimeoutException, StreamingQueryException, InterruptedException, IOException {
        FileUtils.deleteDirectory((File)new File("/tmp/hive-warehouse/myexternaltable/"));
        FileUtils.deleteDirectory((File)new File("/tmp/cp/thiveoutputexternal"));
        ss.sql("DROP TABLE IF EXISTS mytable");
        ss.sql("CREATE EXTERNAL TABLE IF NOT EXISTS myexternaltable (timestamp timestamp, value long) USING PARQUET LOCATION '/tmp/hive-warehouse/myexternaltable/'");
        Dataset data = ss.readStream().format("rate").load();
        ImmutableTHiveOutput hiveOutput = ImmutableTHiveOutput.builder().putOptions("checkpointLocation", "/tmp/cp/thiveoutputexternal").putOptions("format", "parquet").putOptions("path", "/tmp/hive-warehouse/myexternaltable/").trigger(Trigger.ProcessingTime((String)"1 second")).input(data).external(true).build();
        StreamingQuery query = hiveOutput.start();
        query.awaitTermination(4500L);
        Dataset readDs = ss.sql("SELECT * FROM myexternaltable");
        Thread.sleep(2000L);
        readDs.show();
        Assertions.assertTrue((readDs.count() >= 2L ? 1 : 0) != 0);
    }

    @Test
    public void testManagedTable() throws IOException, TimeoutException, StreamingQueryException {
        FileUtils.deleteDirectory((File)new File("/tmp/cp/thiveoutputmanaged"));
        ss.sql("DROP TABLE IF EXISTS mymanagedtable");
        ss.sql("CREATE TABLE IF NOT EXISTS mymanagedtable (timestamp timestamp, value long) STORED AS PARQUET");
        Dataset data = ss.readStream().format("rate").load();
        StreamingQuery query = ImmutableTHiveOutput.builder().input(data).putOptions("checkpointLocation", "/tmp/cp/thiveoutputmanaged").format("parquet").outputMode(OutputMode.Append()).table("default.mymanagedtable").build().start();
        query.awaitTermination(4500L);
        Dataset readData = ss.sql("SELECT * FROM default.mymanagedtable");
        readData.show(false);
        Assertions.assertTrue((readData.count() >= 3L ? 1 : 0) != 0);
    }

    @Test
    @Disabled(value="Integration test to be run manually in dev env")
    public void testIcebergToHive() throws TimeoutException, StreamingQueryException {
        ss.sql("DROP TABLE IF EXISTS icebergtohivetable");
        ss.sql("CREATE TABLE IF NOT EXISTS icebergtohivetable (timestamp timestamp, value long) STORED AS PARQUET");
        Dataset ds_row1 = ss.readStream().format("rate").option("rowsPerSecond", 1L).option("rampUpTime", 0L).load();
        ImmutableTIcebergOutput.builder().table("local.default.ratetoicebergtable").input(ds_row1).outputMode(OutputMode.Append()).putOptions("checkpointLocation", "/tmp/checkpoint/ratetoicebergtable").build().toTable();
        Dataset ds = ImmutableTIcebergInput.builder().sparkSession(ss).tableName("local.default.ratetoicebergtable").build().table();
        StructType schema = new StructType().add("timestamp", DataTypes.TimestampType).add("value", DataTypes.LongType);
        ImmutableTHiveOutput.builder().putOptions("checkpointLocation", "/tmp/checkpoint/icebergtohivetable").putOptions("format", "parquet").input(ds).outputMode(OutputMode.Append()).table("default.icebergtohivetable").addSchemaFieldDescriptions((Component.SchemaFieldDescription)ImmutableSchemaFieldDescription.builder().name("timestamp").type("java.sql.Timestamp").build()).addSchemaFieldDescriptions((Component.SchemaFieldDescription)ImmutableSchemaFieldDescription.builder().name("value").type("java.lang.Integer").build()).external(false).build().start();
    }
}

