/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.technical.tcollectandcheck;

import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
import java.util.concurrent.TimeoutException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.talend.bigdata.common.TestData;
import org.talend.bigdata.common.testutils.TestUtils;
import org.talend.bigdata.structuredstreaming.parquet.ImmutableTFileInputParquet;
import org.talend.bigdata.structuredstreaming.technical.tcollectandcheck.ImmutableTCollectAndCheck;
import org.talend.bigdata.structuredstreaming.technical.tcollectandcheck.ImmutableTCollectAndCheckMapper;
import org.talend.bigdata.structuredstreaming.technical.tcollectandcheck.ImmutableTCollectAndCheckWriter;
import org.talend.bigdata.structuredstreaming.technical.tcollectandcheck.TCollectAndCheckMapper;
import org.talend.bigdata.structuredstreaming.technical.tcollectandcheck.TCollectAndCheckWriter;
import org.talend.bigdata.structuredstreaming.utils.ConsoleLinesCaptor;
import org.talend.bigdata.structuredstreaming.utils.TestUtil;

public class TCollectAndCheckTest {
    public static final String DATAFILES_PARQUET = "datafiles/parquet";
    private static final Logger log = LogManager.getLogger();
    private static SparkSession sparkSession;
    @TempDir
    Path testWorkingFolder;

    @BeforeAll
    public static void setUpAll() {
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        sparkSession = TestData.SPARK.sparkSessionWithUTCConf((boolean)false);
    }

    @AfterAll
    public static void tearDown() {
        sparkSession.stop();
    }

    @BeforeEach
    void setUp() throws IOException {
        Path parquetDir = this.testWorkingFolder.resolve("datafiles/parquet/");
        TestUtil.createTempDirectory(parquetDir);
    }

    @Test
    public void test_collect_check_ok() throws IOException, URISyntaxException {
        ArrayList<String> referenceList = new ArrayList<String>();
        referenceList.add("true;1;123;2020-06-05;2020-06-05 12:00:00;1.0;1.0;1;1;1;abc;a");
        referenceList.add("false;9;789;2020-06-06;2020-06-06 12:00:00;9.0;9.0;9;9;9;xyz;z");
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        Path parquetDir = this.testWorkingFolder.resolve("");
        TestUtils.getResourceFileUrlInsideTempFolder((String)"datafiles/parquet/testFile_1.parquet", (String)parquetDir.toString(), this.getClass());
        Dataset inputDataframe = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetDir.resolve(DATAFILES_PARQUET).toAbsolutePath() + File.separator).build().read();
        ImmutableTCollectAndCheckMapper tCollectAndCheckMapper = ImmutableTCollectAndCheckMapper.builder().separator(";").build();
        ImmutableTCollectAndCheckWriter tCollectAndCheckWriter = ImmutableTCollectAndCheckWriter.builder().logger(log).componentName("tCollectAndCheck_1").addAllReferenceDataList(referenceList).build();
        ImmutableTCollectAndCheck.DataFrame tCollectAndCheck = ImmutableTCollectAndCheck.DataFrame.builder().input(inputDataframe).tCollectAndCheckMapper((TCollectAndCheckMapper)tCollectAndCheckMapper).tCollectAndCheckWriter((TCollectAndCheckWriter)tCollectAndCheckWriter).build();
        List<String> collectedStringList = ConsoleLinesCaptor.getCapturedLinesList(() -> {
            try {
                tCollectAndCheck.start().awaitTermination(10000L);
            }
            catch (TimeoutException | StreamingQueryException e) {
                throw new RuntimeException(e);
            }
        });
        collectedStringList.forEach(System.out::println);
        Assertions.assertEquals((int)5, (int)collectedStringList.size());
        Assertions.assertEquals(referenceList.get(0), (Object)collectedStringList.get(1));
        Assertions.assertEquals(referenceList.get(1), (Object)collectedStringList.get(2));
        Assertions.assertEquals((long)2L, (long)collectedStringList.stream().filter(collectedString -> collectedString.contains("tCollectAndCheck_1 batch' data is correct")).count());
    }

    @Test
    public void test_collect_check_exception_raised_references_greater_than_incoming_data() throws IOException, URISyntaxException {
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        ArrayList<String> referenceList = new ArrayList<String>();
        referenceList.add("true;1;123;2020-06-05;2020-06-05 12:00:00;1.0;1.0;1;1;1;abc;a");
        referenceList.add("false;9;789;2020-06-06;2020-06-06 12:00:00;9.0;9.0;9;9;9;xyz;z");
        referenceList.add("false;10;567;2020-07-06;2020-07-06 12:00:00;8.0;8.0;8;8;8;qsd;q");
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
        Path parquetDir = this.testWorkingFolder.resolve("");
        TestUtils.getResourceFileUrlInsideTempFolder((String)"datafiles/parquet/testFile_1.parquet", (String)parquetDir.toString(), this.getClass());
        Dataset inputDataframe = ImmutableTFileInputParquet.builder().sparkSession(sparkSession).addAllSchemaFieldDescriptions(TestUtil.immutableSchemaStructureFieldDescriptionList).addPaths(parquetDir.resolve(DATAFILES_PARQUET).toAbsolutePath() + File.separator).build().read();
        ImmutableTCollectAndCheckMapper tCollectAndCheckMapper = ImmutableTCollectAndCheckMapper.builder().separator(";").build();
        ImmutableTCollectAndCheckWriter tCollectAndCheckWriter = ImmutableTCollectAndCheckWriter.builder().logger(log).componentName("tCollectAndCheck_1").addAllReferenceDataList(referenceList).build();
        ImmutableTCollectAndCheck.DataFrame tCollectAndCheck = ImmutableTCollectAndCheck.DataFrame.builder().input(inputDataframe).tCollectAndCheckMapper((TCollectAndCheckMapper)tCollectAndCheckMapper).tCollectAndCheckWriter((TCollectAndCheckWriter)tCollectAndCheckWriter).build();
        StreamingQueryException streamingQueryException = (StreamingQueryException)Assertions.assertThrows(StreamingQueryException.class, () -> tCollectAndCheck.start().awaitTermination(10000L));
        Assertions.assertTrue((boolean)streamingQueryException.getMessage().contains("number of lines do not match, 3 lines expected but 2 lines found"));
    }
}

