/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.technical.tfixedflowstreaminput;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.talend.bigdata.common.Component;
import org.talend.bigdata.common.ImmutableSchemaFieldDescription;
import org.talend.bigdata.common.TestData;
import org.talend.bigdata.structuredstreaming.common.input.StreamReader;
import org.talend.bigdata.structuredstreaming.technical.tfixedflowstreaminput.ImmutableTFixedFlowStreamInput;

public class TFixedFlowStreamInputTest {
    private static SparkSession ss;
    private static StreamingQuery query;

    @BeforeAll
    public static void setUp() {
        ss = TestData.SPARK.sparkSession();
    }

    @Test
    void testLoad() throws TimeoutException {
        ImmutableTFixedFlowStreamInput input = ImmutableTFixedFlowStreamInput.builder().sparkSession(ss).addSchemaFieldDescriptions(new Component.SchemaFieldDescription[]{ImmutableSchemaFieldDescription.builder().name("ByteColumn").type("org.apache.spark.sql.types.DataTypes.ByteType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("ShortColumn").type("org.apache.spark.sql.types.DataTypes.ShortType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("IntegerColumn").type("org.apache.spark.sql.types.DataTypes.IntegerType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("LongColumn").type("org.apache.spark.sql.types.DataTypes.LongType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("FloatColumn").type("org.apache.spark.sql.types.DataTypes.FloatType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("DoubleColumn").type("org.apache.spark.sql.types.DataTypes.DoubleType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("DecimalColumn").type("org.apache.spark.sql.types.DecimalType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("StringColumn").type("org.apache.spark.sql.types.DataTypes.StringType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("BinaryColumn").type("org.apache.spark.sql.types.BinaryType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("BooleanColumn").type("org.apache.spark.sql.types.DataTypes.BooleanType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("DateColumn").type("org.apache.spark.sql.types.DataTypes.DateType").pattern("").isNullable(true).build(), ImmutableSchemaFieldDescription.builder().name("TimestampColumn").type("org.apache.spark.sql.types.DataTypes.TimestampType").pattern("").isNullable(true).build()}).rows(Arrays.asList(RowFactory.create((Object[])new Object[]{(byte)1, (short)1, 1, 1L, Float.valueOf(1.0f), 1.0, new BigDecimal("1"), "abc", new byte[]{1, 2, 3, 4}, true, Date.valueOf("2020-06-05"), Timestamp.valueOf("2020-06-05 12:00:00")}))).duplicateRowsBy(3).putOptions("rowsPerBatch", "1").enableWatermarking(StreamReader.WatermarkingType.PROCESSING_TIME).watermarkDelay("10 seconds").build();
        Dataset ds = input.load();
        query = ds.writeStream().format("memory").queryName("writerquery").trigger(Trigger.AvailableNow()).start();
        query.processAllAvailable();
        Dataset result = ss.sql("SELECT * FROM writerquery");
        result.show(false);
        Assertions.assertEquals((int)13, (int)result.schema().fields().length);
        long count = result.count();
        Assertions.assertEquals((long)3L, (long)count);
    }

    @Test
    void testLoadSpark34() {
        SparkSession spark = (SparkSession)Mockito.mock(SparkSession.class);
        SparkContext context = (SparkContext)Mockito.mock(SparkContext.class);
        DataStreamReader reader = (DataStreamReader)Mockito.mock(DataStreamReader.class);
        Dataset ds = (Dataset)Mockito.mock(Dataset.class);
        Dataset staticRows = (Dataset)Mockito.mock(Dataset.class);
        Mockito.when((Object)spark.sparkContext()).thenReturn((Object)context);
        Mockito.when((Object)context.version()).thenReturn((Object)"3.4.0");
        Mockito.when((Object)spark.readStream()).thenReturn((Object)reader);
        Mockito.when((Object)reader.format(ArgumentMatchers.anyString())).thenReturn((Object)reader);
        Mockito.when((Object)reader.options(ArgumentMatchers.anyMap())).thenReturn((Object)reader);
        Mockito.when((Object)reader.load()).thenReturn((Object)ds);
        Mockito.when((Object)ds.crossJoin((Dataset)ArgumentMatchers.any())).thenReturn((Object)ds);
        Mockito.when((Object)ds.drop(ArgumentMatchers.anyString())).thenReturn((Object)ds);
        Mockito.when((Object)spark.createDataFrame(ArgumentMatchers.anyList(), (StructType)ArgumentMatchers.any(StructType.class))).thenReturn((Object)staticRows);
        ImmutableTFixedFlowStreamInput input = ImmutableTFixedFlowStreamInput.builder().sparkSession(spark).addSchemaFieldDescriptions((Component.SchemaFieldDescription)ImmutableSchemaFieldDescription.builder().name("name").type("org.apache.spark.sql.types.DataTypes.StringType").isNullable(true).build()).build();
        input.load();
        ((Dataset)Mockito.verify((Object)ds)).crossJoin(staticRows);
    }

    @AfterEach
    void stopStreamingQuery() throws TimeoutException {
        if (query != null) {
            query.stop();
        }
    }

    @AfterAll
    public static void tearDown() {
        if (ss != null) {
            ss.stop();
        }
    }
}

