/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.parquet;

import java.io.File;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.TimeZone;
import java.util.stream.Stream;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.CleanupMode;
import org.junit.jupiter.api.io.TempDir;
import org.talend.bigdata.common.TestData;
import org.talend.bigdata.structuredstreaming.common.output.FileWriter;
import org.talend.bigdata.structuredstreaming.parquet.ImmutableTFileInputParquet;
import org.talend.bigdata.structuredstreaming.parquet.ImmutableTFileOutputParquet;
import org.talend.bigdata.structuredstreaming.utils.TestUtil;

class TFileOutputParquetTest {
    @TempDir(cleanup=CleanupMode.ALWAYS)
    Path testWorkingFolder;

    TFileOutputParquetTest() {
    }

    @Test
    void testFileOutputParquet_basic() throws Exception {
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        SparkSession sparkSession = TestData.SPARK.sparkSession();
        ArrayList<String> referenceList = new ArrayList<String>();
        referenceList.add("[true,1,123,2020-06-05,2020-06-05 12:00:00.0,1.0,1.0,1,1,1,abc,a]");
        referenceList.add("[false,9,789,2020-06-06,2020-06-06 12:00:00.0,9.0,9.0,9,9,9,xyz,z]");
        Path sourceFile = Paths.get(Objects.requireNonNull(this.getClass().getResource("/datafiles/parquet/testFile_1.parquet")).toURI());
        Path targetFile = this.testWorkingFolder.resolve("datafiles/parquet/testFile_1.parquet");
        Files.createDirectories(targetFile.getParent(), new FileAttribute[0]);
        Files.copy(sourceFile, targetFile, StandardCopyOption.REPLACE_EXISTING);
        String parquetPath = this.testWorkingFolder.resolve("datafiles").resolve("parquet").toAbsolutePath() + File.separator;
        String checkpointPath = this.testWorkingFolder.resolve("checkpoint").toAbsolutePath().toString();
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        Dataset parquetFileTFileInputParquet = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetPath).build().read();
        ImmutableTFileOutputParquet immutableTFileOutputParquet = ImmutableTFileOutputParquet.builder().sparkSession(sparkSession).path(this.testWorkingFolder.resolve("output").toAbsolutePath().toString()).input(parquetFileTFileInputParquet).format(FileWriter.FileFormat.PARQUET).putOptions("checkpointLocation", checkpointPath + "/testLoad").trigger(Trigger.ProcessingTime((String)"1 second")).build();
        StreamingQuery streamingQuery = immutableTFileOutputParquet.start();
        streamingQuery.awaitTermination(5000L);
        Dataset outputData = sparkSession.read().schema(parquetFileTFileInputParquet.schema()).parquet(this.testWorkingFolder.resolve("output").toAbsolutePath().toString());
        List dataList = outputData.collectAsList();
        Assertions.assertEquals((int)2, (int)dataList.size());
        Assertions.assertEquals(referenceList.get(0), (Object)((Row)dataList.get(0)).toString());
        Assertions.assertEquals(referenceList.get(1), (Object)((Row)dataList.get(1)).toString());
    }

    @Test
    void testFileOutputParquet_sort() throws Exception {
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        SparkSession sparkSession = TestData.SPARK.sparkSession();
        ArrayList<String> referenceList = new ArrayList<String>();
        referenceList.add("[true,1,a,2020-06-05,1.0,1.0,1,1,1,abc,2020-06-05 12:00:00.0,123]");
        referenceList.add("[false,9,z,2020-06-06,9.0,9.0,9,9,9,xyz,2020-06-06 12:00:00.0,789]");
        Path sourceFile = Paths.get(Objects.requireNonNull(this.getClass().getResource("/datafiles/parquet/testFile_1.parquet")).toURI());
        Path targetFile = this.testWorkingFolder.resolve("datafiles/parquet/testFile_1.parquet");
        Files.createDirectories(targetFile.getParent(), new FileAttribute[0]);
        Files.copy(sourceFile, targetFile, StandardCopyOption.REPLACE_EXISTING);
        String parquetPath = this.testWorkingFolder.resolve("datafiles").resolve("parquet").toAbsolutePath() + File.separator;
        String checkpointPath = this.testWorkingFolder.resolve("checkpoint").toAbsolutePath().toString();
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        Dataset parquetFileTFileInputParquet = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetPath).build().read();
        ImmutableTFileOutputParquet immutableTFileOutputParquet = ImmutableTFileOutputParquet.builder().sparkSession(sparkSession).path(this.testWorkingFolder.resolve("output").toAbsolutePath().toString()).input(parquetFileTFileInputParquet).format(FileWriter.FileFormat.PARQUET).putOptions("checkpointLocation", checkpointPath + "/testLoad").sortColumnsAlphabetically(true).trigger(Trigger.AvailableNow()).build();
        StreamingQuery streamingQuery = immutableTFileOutputParquet.start();
        streamingQuery.awaitTermination(5000L);
        Dataset outputData = sparkSession.read().parquet(this.testWorkingFolder.resolve("output").toAbsolutePath().toString());
        Object[] columnsToCheck = outputData.columns();
        Object[] columnsSorted = (String[])outputData.columns().clone();
        Arrays.sort(columnsSorted);
        Assertions.assertArrayEquals((Object[])columnsSorted, (Object[])columnsToCheck);
        List dataList = outputData.collectAsList();
        Assertions.assertEquals((int)2, (int)dataList.size());
        Assertions.assertEquals(referenceList.get(0), (Object)((Row)dataList.get(0)).toString());
        Assertions.assertEquals(referenceList.get(1), (Object)((Row)dataList.get(1)).toString());
    }

    @Test
    void testFileOutputParquet_partition() throws Exception {
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        SparkSession sparkSession = TestData.SPARK.sparkSession();
        Path sourceFile = Paths.get(Objects.requireNonNull(this.getClass().getResource("/datafiles/parquet/testFile_1.parquet")).toURI());
        Path targetFile = this.testWorkingFolder.resolve("datafiles/parquet/testFile_1.parquet");
        Files.createDirectories(targetFile.getParent(), new FileAttribute[0]);
        Files.copy(sourceFile, targetFile, StandardCopyOption.REPLACE_EXISTING);
        String parquetPath = this.testWorkingFolder.resolve("datafiles").resolve("parquet").toAbsolutePath() + File.separator;
        String checkpointPath = this.testWorkingFolder.resolve("checkpoint").toAbsolutePath().toString();
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        Dataset parquetFileTFileInputParquet = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetPath).build().read();
        ImmutableTFileOutputParquet immutableTFileOutputParquet = ImmutableTFileOutputParquet.builder().sparkSession(sparkSession).path(this.testWorkingFolder.resolve("output").toAbsolutePath().toString()).input(parquetFileTFileInputParquet).format(FileWriter.FileFormat.PARQUET).putOptions("checkpointLocation", checkpointPath + "/testPartition").partitionColumns(new String[]{"t_Boolean"}).trigger(Trigger.AvailableNow()).build();
        StreamingQuery streamingQuery = immutableTFileOutputParquet.start();
        streamingQuery.awaitTermination(5000L);
        Path outputPath = this.testWorkingFolder.resolve("output");
        Path truePartition = outputPath.resolve("t_Boolean=true");
        Path falsePartition = outputPath.resolve("t_Boolean=false");
        Assertions.assertTrue((boolean)Files.exists(truePartition, new LinkOption[0]), (String)"Partition directory t_Boolean=true should exist");
        Assertions.assertTrue((boolean)Files.exists(falsePartition, new LinkOption[0]), (String)"Partition directory t_Boolean=false should exist");
        try (Stream<Path> truePartitionStream = Files.list(truePartition);){
            Assertions.assertTrue((boolean)truePartitionStream.anyMatch(p -> p.toString().endsWith(".parquet")), (String)"True partition should contain parquet files");
        }
        try (Stream<Path> falsePartitionStream = Files.list(falsePartition);){
            Assertions.assertTrue((boolean)falsePartitionStream.anyMatch(p -> p.toString().endsWith(".parquet")), (String)"False partition should contain parquet files");
        }
        Dataset outputData = sparkSession.read().parquet(outputPath.toAbsolutePath().toString());
        List dataList = outputData.collectAsList();
        Assertions.assertEquals((int)2, (int)dataList.size());
        Dataset truePartitionData = sparkSession.read().parquet(truePartition.toAbsolutePath().toString());
        Dataset falsePartitionData = sparkSession.read().parquet(falsePartition.toAbsolutePath().toString());
        Assertions.assertEquals((long)1L, (long)truePartitionData.count());
        Assertions.assertEquals((long)1L, (long)falsePartitionData.count());
    }

    @Test
    void testFileOutputParquet_multipartition() throws Exception {
        List<Path> partitionDirs;
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        SparkSession sparkSession = TestData.SPARK.sparkSession();
        Path sourceFile = Paths.get(Objects.requireNonNull(this.getClass().getResource("/datafiles/parquet/testFile_1.parquet")).toURI());
        Path targetFile = this.testWorkingFolder.resolve("datafiles/parquet/testFile_1.parquet");
        Files.createDirectories(targetFile.getParent(), new FileAttribute[0]);
        Files.copy(sourceFile, targetFile, StandardCopyOption.REPLACE_EXISTING);
        String parquetPath = this.testWorkingFolder.resolve("datafiles").resolve("parquet").toAbsolutePath() + File.separator;
        String checkpointPath = this.testWorkingFolder.resolve("checkpoint").toAbsolutePath().toString();
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        Dataset parquetFileTFileInputParquet = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetPath).build().read();
        ImmutableTFileOutputParquet immutableTFileOutputParquet = ImmutableTFileOutputParquet.builder().sparkSession(sparkSession).path(this.testWorkingFolder.resolve("output").toAbsolutePath().toString()).input(parquetFileTFileInputParquet).format(FileWriter.FileFormat.PARQUET).putOptions("checkpointLocation", checkpointPath + "/testMultiPartition").partitionColumns(new String[]{"t_Boolean", "t_Byte"}).trigger(Trigger.ProcessingTime((String)"1 second")).build();
        StreamingQuery streamingQuery = immutableTFileOutputParquet.start();
        streamingQuery.awaitTermination(5000L);
        Path outputPath = this.testWorkingFolder.resolve("output");
        try (Stream<Path> walkStream = Files.walk(outputPath, new FileVisitOption[0]);){
            partitionDirs = walkStream.filter(x$0 -> Files.isDirectory(x$0, new LinkOption[0])).filter(p -> p.getFileName().toString().contains("=")).toList();
        }
        Assertions.assertTrue((partitionDirs.size() >= 2 ? 1 : 0) != 0, (String)"Should have partition directories");
        Dataset outputData = sparkSession.read().parquet(outputPath.toAbsolutePath().toString());
        Assertions.assertEquals((long)2L, (long)outputData.count());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testFileOutputParquet_forceUtc() throws Exception {
        String originalTimeZone = TimeZone.getDefault().getID();
        TimeZone.setDefault(TimeZone.getTimeZone("GMT+2"));
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        SparkSession sparkSession = TestData.SPARK.sparkSession();
        ArrayList<String> referenceList = new ArrayList<String>();
        referenceList.add("[true,1,123,2020-06-05,2020-06-05 14:00:00.0,1.0,1.0,1,1,1,abc,a]");
        referenceList.add("[false,9,789,2020-06-06,2020-06-06 14:00:00.0,9.0,9.0,9,9,9,xyz,z]");
        try {
            Path sourceFile = Paths.get(Objects.requireNonNull(this.getClass().getResource("/datafiles/parquet/testFile_1.parquet")).toURI());
            Path targetFile = this.testWorkingFolder.resolve("datafiles/parquet/testFile_1.parquet");
            Files.createDirectories(targetFile.getParent(), new FileAttribute[0]);
            Files.copy(sourceFile, targetFile, StandardCopyOption.REPLACE_EXISTING);
            String parquetPath = this.testWorkingFolder.resolve("datafiles").resolve("parquet").toAbsolutePath() + File.separator;
            String checkpointPath = this.testWorkingFolder.resolve("checkpoint").toAbsolutePath().toString();
            Dataset parquetFileTFileInputParquet = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetPath).build().read();
            ImmutableTFileOutputParquet immutableTFileOutputParquet = ImmutableTFileOutputParquet.builder().sparkSession(sparkSession).path(this.testWorkingFolder.resolve("output").toAbsolutePath().toString()).input(parquetFileTFileInputParquet).format(FileWriter.FileFormat.PARQUET).putOptions("checkpointLocation", checkpointPath + "/testLoad").trigger(Trigger.ProcessingTime((String)"1 second")).overrideDatesTZwithUTC(true).build();
            StreamingQuery streamingQuery = immutableTFileOutputParquet.start();
            streamingQuery.awaitTermination(5000L);
            TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
            Dataset outputData = sparkSession.read().schema(parquetFileTFileInputParquet.schema()).parquet(this.testWorkingFolder.resolve("output").toAbsolutePath().toString());
            List dataList = outputData.collectAsList();
            Assertions.assertEquals((int)2, (int)dataList.size());
            System.out.println("Expected: " + (String)referenceList.get(0));
            System.out.println("Actual:   " + ((Row)dataList.get(0)).toString());
            Assertions.assertEquals(referenceList.get(0), (Object)((Row)dataList.get(0)).toString());
            Assertions.assertEquals(referenceList.get(1), (Object)((Row)dataList.get(1)).toString());
        }
        finally {
            TimeZone.setDefault(TimeZone.getTimeZone(originalTimeZone));
        }
    }
}

