/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.iceberg;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.streaming.Trigger;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.talend.bigdata.common.TestData;
import org.talend.bigdata.structuredstreaming.iceberg.ImmutableTIcebergOutput;

public class TIcebergOutputTest {
    private static SparkSession ss;
    private static final String CHECKPOINT_PATH = "/tmp/cp";

    @BeforeAll
    public static void setUp() {
        SparkConf conf = new SparkConf(true);
        conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
        conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog");
        conf.set("spark.sql.catalog.local.type", "hadoop");
        conf.set("spark.sql.catalog.local.warehouse", "/tmp/warehouse");
        conf.set("spark.sql.catalog.local.default.write.metadata-flush-after-create", "true");
        conf.set("spark.sql.defaultCatalog", "local");
        ss = TestData.SPARK.sparkSession((SparkConf)conf);
        ss.sql("CREATE DATABASE IF NOT EXISTS mydb");
    }

    @AfterAll
    public static void tearDown() {
        ss.stop();
    }

    @BeforeEach
    public void dropTable() throws IOException {
        ss.sql("DROP TABLE IF EXISTS local.mydb.mytable");
        FileUtils.deleteDirectory((File)new File(CHECKPOINT_PATH));
    }

    @Test
    @Disabled(value="To be fixed for Jenkins")
    public void testStart() throws StreamingQueryException, InterruptedException {
        ss.sql("CREATE TABLE local.mydb.mytable (timestamp timestamp, value long)");
        Dataset data = ss.readStream().format("rate").load();
        ImmutableTIcebergOutput immutableTIcebergOutput = ImmutableTIcebergOutput.builder().table("local.mydb.mytable").putOptions("checkpointLocation", "/tmp/cp/testStart").trigger(Trigger.ProcessingTime((String)"1 second")).input(data).build();
        StreamingQuery query = immutableTIcebergOutput.start();
        query.awaitTermination(3900L);
        Dataset ds2 = ss.sql("SELECT * FROM local.mydb.mytable");
        Thread.sleep(2000L);
        Assertions.assertTrue((ds2.count() >= 5L ? 1 : 0) != 0);
        ds2.show();
    }

    @Test
    public void testToTable() throws StreamingQueryException, TimeoutException, InterruptedException {
        ss.sql("CREATE TABLE local.mydb.mytable (timestamp timestamp, value long)");
        Dataset data = ss.readStream().format("rate").load();
        ImmutableTIcebergOutput immutableTIcebergOutput = ImmutableTIcebergOutput.builder().table("local.mydb.mytable").putOptions("checkpointLocation", "/tmp/cp/testLoad").trigger(Trigger.ProcessingTime((String)"1 second")).input(data).build();
        StreamingQuery query = immutableTIcebergOutput.toTable();
        query.awaitTermination(4500L);
        Dataset ds2 = ss.sql("SELECT * FROM local.mydb.mytable");
        Thread.sleep(2000L);
        Assertions.assertTrue((ds2.count() >= 5L ? 1 : 0) != 0);
        ds2.show();
    }
}

