/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.parquet;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.TimeoutException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.talend.bigdata.common.TestData;
import org.talend.bigdata.common.testutils.TestUtils;
import org.talend.bigdata.structuredstreaming.common.input.StreamReader;
import org.talend.bigdata.structuredstreaming.parquet.ImmutableTFileInputParquet;
import org.talend.bigdata.structuredstreaming.utils.TestUtil;

public class TFileInputParquetTest {
    @TempDir
    Path testWorkingFolder;

    @BeforeEach
    void setUp() throws Exception {
        Path parquetDir = this.testWorkingFolder.resolve("datafiles/parquet/");
        TestUtil.createTempDirectory(parquetDir);
    }

    @Test
    void testParquetFileInput() throws Exception {
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        SparkSession sparkSession = TestData.SPARK.sparkSession();
        ArrayList<String> referenceList = new ArrayList<String>();
        referenceList.add("[true,1,123,2020-06-05,2020-06-05 12:00:00.0,1.0,1.0,1,1,1,abc,a]");
        referenceList.add("[false,9,789,2020-06-06,2020-06-06 12:00:00.0,9.0,9.0,9,9,9,xyz,z]");
        Path parquetDir = this.testWorkingFolder.resolve("");
        TestUtils.getResourceFileUrlInsideTempFolder((String)"datafiles/parquet/testFile_1.parquet", (String)parquetDir.toString(), this.getClass());
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        Dataset parquetFileTFileInputParquet = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetDir.resolve("datafiles").resolve("parquet").toAbsolutePath() + File.separator).build().read();
        parquetFileTFileInputParquet.writeStream().format("memory").queryName("myTable").outputMode("append").start().processAllAvailable();
        List rowList = sparkSession.sql("SELECT * FROM myTable").collectAsList();
        Assertions.assertEquals((int)2, (int)rowList.size());
        Assertions.assertEquals(referenceList.get(0), (Object)((Row)rowList.get(0)).toString());
        Assertions.assertEquals(referenceList.get(1), (Object)((Row)rowList.get(1)).toString());
    }

    @Test
    void applyLocalTimezoneIfNeededTest() throws TimeoutException, IOException, URISyntaxException {
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        SparkSession sparkSession = TestData.SPARK.sparkSession();
        ArrayList<String> referenceList = new ArrayList<String>();
        referenceList.add("[true,1,123,2020-06-05,2020-06-05 12:00:00.0,1.0,1.0,1,1,1,abc,a]");
        referenceList.add("[false,9,789,2020-06-06,2020-06-06 12:00:00.0,9.0,9.0,9,9,9,xyz,z]");
        Path parquetDir = this.testWorkingFolder.resolve("");
        TestUtils.getResourceFileUrlInsideTempFolder((String)"datafiles/parquet/testFile_1.parquet", (String)parquetDir.toString(), this.getClass());
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        Dataset parquetFileTFileInputParquet = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetDir.resolve("datafiles").resolve("parquet").toAbsolutePath() + File.separator).readDatesInLocalTimeZone(true).build().read();
        parquetFileTFileInputParquet.writeStream().format("memory").queryName("myTable2").outputMode("append").start().processAllAvailable();
        List rowList = sparkSession.sql("SELECT * FROM myTable2").collectAsList();
        Assertions.assertEquals((int)2, (int)rowList.size());
        Assertions.assertEquals(referenceList.get(0), (Object)((Row)rowList.get(0)).toString());
        Assertions.assertEquals(referenceList.get(1), (Object)((Row)rowList.get(1)).toString());
    }

    @Test
    void testReadWithProcessinTimeWatermarking() throws TimeoutException, IOException, URISyntaxException {
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        SparkSession sparkSession = TestData.SPARK.sparkSession();
        Path parquetDir = this.testWorkingFolder.resolve("");
        TestUtils.getResourceFileUrlInsideTempFolder((String)"datafiles/parquet/testFile_1.parquet", (String)parquetDir.toString(), this.getClass());
        Dataset parquetFileTFileInputParquet = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetDir.resolve("datafiles").resolve("parquet").toAbsolutePath() + File.separator).enableWatermarking(StreamReader.WatermarkingType.PROCESSING_TIME).build().read();
        parquetFileTFileInputParquet.writeStream().format("memory").queryName("testReadWithProcessinTimeWatermarking").outputMode("append").start().processAllAvailable();
        Dataset data = sparkSession.sql("SELECT * FROM testReadWithProcessinTimeWatermarking");
        StructType schema = data.schema();
        Assertions.assertEquals((int)13, (int)schema.fields().length);
        Assertions.assertTrue((boolean)Arrays.asList(schema.fieldNames()).contains("processing_time"));
    }
}

