/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.iceberg;

import java.util.concurrent.TimeoutException;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.talend.bigdata.common.Component;
import org.talend.bigdata.common.ImmutableSchemaFieldDescription;
import org.talend.bigdata.common.TestData;
import org.talend.bigdata.structuredstreaming.iceberg.ImmutableTIcebergInput;
import org.talend.bigdata.structuredstreaming.iceberg.ImmutableTIcebergOutput;

public class TIcebergInputTest {
    private static SparkSession ss;

    @BeforeAll
    public static void setUp() {
        SparkConf conf = new SparkConf(true);
        conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
        conf.set("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog");
        conf.set("spark.sql.catalog.local.type", "hadoop");
        conf.set("spark.sql.catalog.local.warehouse", "/tmp/warehouse");
        conf.set("spark.sql.catalog.local.default.write.metadata-flush-after-create", "true");
        conf.set("spark.sql.defaultCatalog", "local");
        ss = TestData.SPARK.sparkSession((SparkConf)conf);
        ss.sql("CREATE DATABASE IF NOT EXISTS mydb");
    }

    @AfterAll
    public static void tearDown() {
        ss.stop();
    }

    public void createTable(String table) throws NoSuchTableException, TimeoutException, StreamingQueryException {
        ss.sql("DROP TABLE IF EXISTS local.default." + table);
        Dataset ds_row1 = ss.readStream().format("rate").option("rowsPerSecond", 1L).option("rampUpTime", 0L).load();
        StreamingQuery query = ImmutableTIcebergOutput.builder().table("local.default." + table).input(ds_row1).outputMode(OutputMode.Append()).putOptions("checkpointLocation", "/tmp/cplocation/" + table).build().toTable();
        int retries = 10;
        boolean success = false;
        for (int i = 0; i < retries; ++i) {
            long count = ss.read().format("iceberg").load("local.default." + table).count();
            if (count > 0L) {
                success = true;
                break;
            }
            try {
                Thread.sleep(500L);
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        query.stop();
        if (!success) {
            throw new IllegalStateException("No data was written to the Iceberg table: " + table);
        }
    }

    @Test
    void testTableWithoutSchema() throws TimeoutException, StreamingQueryException, NoSuchTableException {
        this.createTable("mytablewithoutschema");
        ImmutableTIcebergInput tIcebergInput = ImmutableTIcebergInput.builder().sparkSession(ss).tableName("local.default.mytablewithoutschema").build();
        StreamingQuery query = tIcebergInput.table().writeStream().format("memory").queryName("mytablewithoutschemaQuery").start();
        query.processAllAvailable();
        Dataset result = ss.sql("SELECT * FROM mytablewithoutschemaQuery");
        result.show(false);
        long count = result.count();
        Assertions.assertTrue((count > 0L ? 1 : 0) != 0, (String)"Expected data in the DataFrame, but found none.");
        query.stop();
    }

    @Test
    void testTableWithSchema() throws TimeoutException, StreamingQueryException, NoSuchTableException {
        this.createTable("mytablewithschema");
        ImmutableTIcebergInput tIcebergInput = ImmutableTIcebergInput.builder().sparkSession(ss).tableName("local.default.mytablewithschema").addSchemaFieldDescriptions(new Component.SchemaFieldDescription[]{ImmutableSchemaFieldDescription.builder().name("timestamp").type("timestamp").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("value").type("long").isNullable(true).build()}).build();
        StreamingQuery query = tIcebergInput.table().writeStream().format("memory").queryName("mytablewithschemaQuery").start();
        query.processAllAvailable();
        Dataset result = ss.sql("SELECT * FROM mytablewithschemaQuery");
        long count = result.count();
        Assertions.assertTrue((count > 0L ? 1 : 0) != 0, (String)"Expected data in the Dataset, but found none.");
        query.stop();
    }
}

