/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.kafka;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Path;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.Schema;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.talend.bigdata.common.ImmutableSchemaFieldDescription;
import org.talend.bigdata.common.TestData;
import org.talend.bigdata.structuredstreaming.kafka.ImmutableTKafkaConfiguration;
import org.talend.bigdata.structuredstreaming.kafka.ImmutableTKafkaInputAvro;
import org.talend.bigdata.structuredstreaming.kafka.TKafkaConfiguration;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

public class TKafkaInputAvroTest
implements Serializable {
    private static final Network KAFKA_NETWORK = Network.newNetwork();
    protected static final ConfluentKafkaContainer KAFKA = (ConfluentKafkaContainer)((ConfluentKafkaContainer)((ConfluentKafkaContainer)new ConfluentKafkaContainer("confluentinc/cp-kafka:7.8.0").withNetwork(KAFKA_NETWORK)).withNetworkAliases(new String[]{"kafka"})).withEnv(Map.of("KAFKA_ADVERTISED_LISTENERS", "PLAINTEXT://kafka:9092", "KAFKA_LISTENERS", "PLAINTEXT://kafka:9092"));
    private static final GenericContainer<?> SCHEMA_REGISTRY = new GenericContainer(DockerImageName.parse((String)"confluentinc/cp-schema-registry:7.8.0")).withNetwork(KAFKA_NETWORK).withExposedPorts(new Integer[]{8081}).withEnv(Map.of("SCHEMA_REGISTRY_HOST_NAME", "schema-registry", "SCHEMA_REGISTRY_LISTENERS", "http://0.0.0.0:8081", "SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://kafka:9092")).waitingFor((WaitStrategy)Wait.forHttp((String)"/subjects").forStatusCode(200));
    @TempDir
    Path testWorkingFolder;
    private static SparkSession sparkSession;
    private String avroSchema = "{\"type\":\"record\",\"name\":\"DS_row1AvroRecord\",\"namespace\":\"local.kafka_to_console_0_1\",\"fields\":[{\"name\":\"t_Boolean\",\"type\":[\"boolean\",\"null\"]},{\"name\":\"t_Byte\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.Byte\"},\"null\"]},{\"name\":\"t_byteArray\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"byte[]\"},\"null\"]},{\"name\":\"t_Date\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.sql.Timestamp\"},\"null\"]},{\"name\":\"t_TimeStamp\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.sql.Timestamp\"},\"null\"]},{\"name\":\"t_Double\",\"type\":[\"double\",\"null\"]},{\"name\":\"t_Float\",\"type\":[\"float\",\"null\"]},{\"name\":\"t_BigDecimal\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.math.BigDecimal\"},\"null\"]},{\"name\":\"t_Integer\",\"type\":[\"int\",\"null\"]},{\"name\":\"t_Long\",\"type\":[\"long\",\"null\"]},{\"name\":\"t_Short\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.Short\"},\"null\"]},{\"name\":\"t_String\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.String\"},\"null\"]},{\"name\":\"t_Char\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.String\"},\"null\"]}]}";

    @BeforeAll
    static void startKafkaAndRegistryContainers() throws IOException, InterruptedException {
        SparkConf sparkConfiguration = new SparkConf(true);
        sparkConfiguration.setMaster("local[*]");
        sparkConfiguration.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true");
        sparkConfiguration.set("spark.log.level", "WARN");
        sparkConfiguration.set("spark.sql.warehouse.dir", "file:////tmp/spark-warehouse");
        sparkSession = TestData.SPARK.sparkSession((SparkConf)sparkConfiguration);
    }

    private static void startContainers() throws IOException, InterruptedException {
        KAFKA.start();
        SCHEMA_REGISTRY.withStartupTimeout(Duration.of(5L, ChronoUnit.MINUTES));
        SCHEMA_REGISTRY.start();
        HttpClient httpClient = HttpClient.newHttpClient();
        HttpRequest request = HttpRequest.newBuilder().uri(URI.create("http://" + SCHEMA_REGISTRY.getHost() + ":" + SCHEMA_REGISTRY.getFirstMappedPort() + "/subjects/testdeux-value/versions")).header("Content-Type", "application/vnd.schemaregistry.v1+json").POST(HttpRequest.BodyPublishers.ofString("{schema\":\"{\"type\":\"record\",\"name\":\"row1Struct\",\"fields\":[{\"name\":\"id\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.String\"},\"null\"]},{\"name\":\"name\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.String\"},\"null\"]}]}\"}")).build();
        HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
        System.out.println(response.statusCode());
    }

    @AfterAll
    public static void tearDownAll() {
        sparkSession.stop();
    }

    @Disabled(value="Disabled until schema registry container is fixed")
    @Test
    void testLoadWithRegistry() throws TimeoutException, StreamingQueryException, InterruptedException {
        System.setProperty("hadoop.home.dir", new File("src/test/resources/winutils/").getAbsolutePath());
        SparkConf sparkConfiguration = new SparkConf(true);
        sparkConfiguration.setMaster("local[*]");
        sparkConfiguration.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true");
        sparkConfiguration.set("spark.log.level", "WARN");
        sparkConfiguration.set("spark.sql.warehouse.dir", "file:////tmp/spark-warehouse");
        SparkSession sparkSession = TestData.SPARK.sparkSession((SparkConf)sparkConfiguration);
        ArrayList<ImmutableSchemaFieldDescription> fieldsDescriptions = new ArrayList<ImmutableSchemaFieldDescription>();
        fieldsDescriptions.add(ImmutableSchemaFieldDescription.builder().name("id").type("java.lang.String").pattern("").isNullable(true).build());
        fieldsDescriptions.add(ImmutableSchemaFieldDescription.builder().name("name").type("java.lang.String").pattern("").isNullable(true).build());
        ImmutableTKafkaConfiguration tKafkaConfiguration = ImmutableTKafkaConfiguration.builder().brokerList("localhost:9092").isSchemaRegistry(Boolean.valueOf(true)).schemaRegistryUrl("http://localhost:8085").build();
        ImmutableTKafkaInputAvro tKafkaInputAvroDataset = ImmutableTKafkaInputAvro.builder().sparkSession(sparkSession).groupId("groupId4").topic("testdeux").startingOffset("earliest").tKafkaConfiguration((TKafkaConfiguration)tKafkaConfiguration).schemaFieldDescriptions(fieldsDescriptions).build();
        Dataset rowDataset = tKafkaInputAvroDataset.load();
        Assertions.assertEquals((Object)"schema", (Object)rowDataset.schema().toString());
        rowDataset.writeStream().queryName("counting").format("memory").option("checkpointLocation", this.testWorkingFolder.getRoot().toAbsolutePath().toString()).outputMode("append").start();
        for (int i = 0; i < 5; ++i) {
            sparkSession.sql("select * from counting").show();
            sparkSession.sql("SELECT count(*) FROM counting").show();
            TimeUnit.SECONDS.sleep(10L);
        }
    }

    @Test
    @Disabled(value="For manual dev testing only")
    void sparkProducer() throws StreamingQueryException, TimeoutException {
        String avroSchema = "{ \"type\":\"record\", \"name\":\"DS_row1AvroRecord\", \"namespace\":\"local.kafka_to_console_0_1\", \"fields\":[{\"name\":\"t_Boolean\",\"type\":[\"boolean\",\"null\"]},{\"name\":\"t_Byte\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.Byte\"},\"null\"]},{\"name\":\"t_byteArray\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"byte[]\"},\"null\"]},{\"name\":\"t_Date\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.sql.Timestamp\"},\"null\"]},{\"name\":\"t_TimeStamp\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.sql.Timestamp\"},\"null\"]},{\"name\":\"t_Double\",\"type\":[\"double\",\"null\"]},{\"name\":\"t_Float\",\"type\":[\"float\",\"null\"]},{\"name\":\"t_BigDecimal\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.math.BigDecimal\"},\"null\"]},{\"name\":\"t_Integer\",\"type\":[\"int\",\"null\"]},{\"name\":\"t_Long\",\"type\":[\"long\",\"null\"]},{\"name\":\"t_Short\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.Short\"},\"null\"]},{\"name\":\"t_String\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.String\"},\"null\"]},{\"name\":\"t_Char\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.String\"},\"null\"]}]}";
        Dataset input = sparkSession.readStream().format("rate").option("rowsPerSecond", 1L).load();
        Dataset avroData = input.selectExpr(new String[]{"CAST(timestamp AS STRING) as ts"}).withColumn("t_Boolean", functions.lit((Object)true)).withColumn("t_Byte", functions.lit((Object)"1")).withColumn("t_byteArray", functions.lit((Object)"dGVzdA==")).withColumn("t_Date", functions.col((String)"ts")).withColumn("t_TimeStamp", functions.col((String)"ts")).withColumn("t_Double", functions.lit((Object)123.456)).withColumn("t_Float", functions.lit((Object)Float.valueOf(78.9f))).withColumn("t_BigDecimal", functions.lit((Object)"12345.67")).withColumn("t_Integer", functions.lit((Object)42)).withColumn("t_Long", functions.lit((Object)999999L)).withColumn("t_Short", functions.lit((Object)"5")).withColumn("t_String", functions.lit((Object)"hello")).withColumn("t_Char", functions.lit((Object)"A")).selectExpr(new String[]{"t_Boolean", "t_Byte", "t_byteArray", "t_Date", "t_TimeStamp", "t_Double", "t_Float", "t_BigDecimal", "t_Integer", "t_Long", "t_Short", "t_String", "t_Char"});
        Assertions.assertEquals((Object)"schema", (Object)avroData.schema().toString());
        Column[] avroCols = new Column[]{functions.col((String)"t_Boolean"), functions.col((String)"t_Byte"), functions.col((String)"t_byteArray"), functions.col((String)"t_Date"), functions.col((String)"t_TimeStamp"), functions.col((String)"t_Double"), functions.col((String)"t_Float"), functions.col((String)"t_BigDecimal"), functions.col((String)"t_Integer"), functions.col((String)"t_Long"), functions.col((String)"t_Short"), functions.col((String)"t_String"), functions.col((String)"t_Char")};
        Column avroStruct = functions.struct((Column[])avroCols);
        Dataset avroEncoded = avroData.withColumn("value", org.apache.spark.sql.avro.functions.to_avro((Column)avroStruct, (String)avroSchema)).selectExpr(new String[]{"CAST(null AS STRING) AS key", "value"});
        StreamingQuery query = avroEncoded.writeStream().format("kafka").option("kafka.bootstrap.servers", "kafka0.weave.local:9092").option("topic", "tuj2_topic").option("checkpointLocation", "/tmp/kafka-avro-checkpoint").start();
        query.awaitTermination();
    }

    @Test
    @Disabled(value="For manual dev testing only")
    void sparkConsumer() throws TimeoutException, StreamingQueryException {
        Dataset kafkaStream = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", "kafka0.weave.local:9092").option("subscribe", "tuj2_topic").option("startingOffsets", "earliest").option("mode", "DROPMALFORMED").load();
        String avroSchema = "{\"type\":\"record\",\"name\":\"DS_row1AvroRecord\",\"namespace\":\"local.kafka_to_console_0_1\",\"fields\":[{\"name\":\"t_Boolean\",\"type\":[\"boolean\",\"null\"]},{\"name\":\"t_Byte\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.Byte\"},\"null\"]},{\"name\":\"t_byteArray\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"byte[]\"},\"null\"]},{\"name\":\"t_Date\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.sql.Timestamp\"},\"null\"]},{\"name\":\"t_TimeStamp\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.sql.Timestamp\"},\"null\"]},{\"name\":\"t_Double\",\"type\":[\"double\",\"null\"]},{\"name\":\"t_Float\",\"type\":[\"float\",\"null\"]},{\"name\":\"t_BigDecimal\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.math.BigDecimal\"},\"null\"]},{\"name\":\"t_Integer\",\"type\":[\"int\",\"null\"]},{\"name\":\"t_Long\",\"type\":[\"long\",\"null\"]},{\"name\":\"t_Short\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.Short\"},\"null\"]},{\"name\":\"t_String\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.String\"},\"null\"]},{\"name\":\"t_Char\",\"type\":[{\"type\":\"string\",\"avro.java.string\":\"String\",\"java-class\":\"java.lang.String\"},\"null\"]}]}";
        Dataset decoded = kafkaStream.select(new Column[]{org.apache.spark.sql.avro.functions.from_avro((Column)functions.col((String)"value"), (String)avroSchema).as("decoded_value")}).select("decoded_value.*", new String[0]);
        Assertions.assertEquals((Object)"schema", (Object)decoded.schema().toString());
        decoded.writeStream().format("console").option("truncate", "false").start().awaitTermination();
    }

    @Test
    @Disabled(value="Needs Kafka to be running")
    void testLoadWithoutRegistry() throws TimeoutException, StreamingQueryException {
        Dataset kafkaStream = ImmutableTKafkaInputAvro.builder().sparkSession(sparkSession).topic("tuj2_topic").startingOffset("earliest").tKafkaConfiguration((TKafkaConfiguration)ImmutableTKafkaConfiguration.builder().brokerList("kafka0.weave.local:9092").build()).schemaFieldDescriptions(new ArrayList()).putOptions("kafka.bootstrap.servers", "kafka0.weave.local:9092").putOptions("subscribe", "tuj2_topic").putOptions("startingOffsets", "earliest").specificRecordSchema(new Schema.Parser().parse(this.avroSchema)).build().load();
        Assertions.assertEquals((Object)"schema", (Object)kafkaStream.schema().toString());
        kafkaStream.writeStream().format("console").option("truncate", "false").start().awaitTermination();
    }

    @Test
    void testWithMocks() {
        SparkSession spark = (SparkSession)Mockito.mock(SparkSession.class);
        DataStreamReader streamReader = (DataStreamReader)Mockito.mock(DataStreamReader.class);
        Dataset ds = (Dataset)Mockito.mock(Dataset.class);
        Mockito.when((Object)spark.sparkContext()).thenReturn((Object)((SparkContext)Mockito.mock(SparkContext.class)));
        Mockito.when((Object)spark.readStream()).thenReturn((Object)streamReader);
        Mockito.when((Object)streamReader.format("kafka")).thenReturn((Object)streamReader);
        Mockito.when((Object)streamReader.option(ArgumentMatchers.anyString(), ArgumentMatchers.anyString())).thenReturn((Object)streamReader);
        Mockito.when((Object)streamReader.load()).thenReturn((Object)ds);
        Mockito.when((Object)ds.select(new Column[]{(Column)ArgumentMatchers.any(Column.class)})).thenReturn((Object)ds);
        ImmutableTKafkaInputAvro tKafkaInputAvro = ImmutableTKafkaInputAvro.builder().sparkSession(spark).topic("mytopic").tKafkaConfiguration((TKafkaConfiguration)ImmutableTKafkaConfiguration.builder().brokerList("mybroker:9092").build()).specificRecordSchema(new Schema.Parser().parse(this.avroSchema)).build();
        Assertions.assertNotNull((Object)tKafkaInputAvro);
        tKafkaInputAvro.load();
        ((DataStreamReader)Mockito.verify((Object)streamReader)).option("kafka.bootstrap.servers", "mybroker:9092");
        ((DataStreamReader)Mockito.verify((Object)streamReader)).option("subscribe", "mytopic");
        ((DataStreamReader)Mockito.verify((Object)streamReader)).format("kafka");
    }
}

