/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.kafka;

import java.io.Serializable;
import java.util.HashMap;
import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.immutables.value.Value;
import org.talend.bigdata.common.Component;
import org.talend.bigdata.structuredstreaming.common.input.StreamReader;
import org.talend.bigdata.structuredstreaming.kafka.TKafkaConfiguration;
import org.talend.bigdata.structuredstreaming.kafka.TKafkaSLLKerberosUtils;
import scala.Predef$;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Map;
import za.co.absa.abris.avro.functions;
import za.co.absa.abris.config.AbrisConfig;
import za.co.absa.abris.config.FromAvroConfig;

@Value.Immutable
public abstract class TKafkaInputAvro
implements Serializable,
Component.WithSchemaDescription,
StreamReader {
    private static final String VALUE = "value";

    public abstract SparkSession sparkSession();

    public abstract TKafkaConfiguration tKafkaConfiguration();

    public abstract java.util.Map<String, String> mappings();

    @Value.Default
    String topic() {
        return "";
    }

    @Value.Default
    String groupId() {
        return "";
    }

    @Value.Default
    String startingOffset() {
        return "latest";
    }

    @Value.Default
    boolean isSetNumberOfRecordsPerSecond() {
        return false;
    }

    @Value.Default
    int numberOfRecordsPerSecond() {
        return 0;
    }

    @Value.Default
    java.util.Map<String, String> kafKaPropertiesMap() {
        return new HashMap<String, String>();
    }

    @Value.Default
    boolean useHierarchicalMode() {
        return false;
    }

    @Value.Default
    boolean failOnDataLoss() {
        return true;
    }

    public abstract Schema specificRecordSchema();

    @Override
    public Dataset<Row> load() {
        DataStreamReader baseOptionsDataStreamReader = this.sparkSession().readStream().format("kafka").option("kafka.bootstrap.servers", this.tKafkaConfiguration().brokerList()).option("subscribe", this.topic()).option("startingOffsets", this.startingOffset()).option("group.id", this.groupId());
        if (!this.failOnDataLoss()) {
            baseOptionsDataStreamReader.option("failOnDataLoss", "false");
        }
        if (this.isSetNumberOfRecordsPerSecond()) {
            baseOptionsDataStreamReader = baseOptionsDataStreamReader.option("maxOffsetsPerTrigger", (long)this.numberOfRecordsPerSecond());
        }
        if (!this.kafKaPropertiesMap().isEmpty()) {
            baseOptionsDataStreamReader.options(this.kafKaPropertiesMap());
        }
        this.setSASLOptions(baseOptionsDataStreamReader);
        this.setKerberosSecurityConfiguration(baseOptionsDataStreamReader);
        Dataset kafkaStream = baseOptionsDataStreamReader.load();
        Dataset rowDataset = Boolean.TRUE.equals(this.tKafkaConfiguration().isSchemaRegistry()) ? this.getAvroFromSchemaRegistry(baseOptionsDataStreamReader) : kafkaStream.select(new Column[]{org.apache.spark.sql.avro.functions.from_avro((Column)org.apache.spark.sql.functions.col((String)VALUE), (String)this.specificRecordSchema().toString()).as("decoded_value")}).select("decoded_value.*", new String[0]);
        return this.withWatermarking(rowDataset);
    }

    private <K, V> Map<K, V> convert(java.util.Map<K, V> m) {
        return ((scala.collection.mutable.Map)JavaConverters$.MODULE$.mapAsScalaMapConverter(m).asScala()).toMap(Predef$.MODULE$.conforms());
    }

    private Dataset<Row> getAvroFromSchemaRegistry(DataStreamReader dataStreamReader) {
        HashMap<String, Object> schemaRegistryMapProperties = new HashMap<String, Object>();
        schemaRegistryMapProperties.put(AbrisConfig.SCHEMA_REGISTRY_URL(), this.tKafkaConfiguration().schemaRegistryUrl());
        if (!this.tKafkaConfiguration().schemaRegistryUsername().isEmpty() && !this.tKafkaConfiguration().schemaRegistryPassword().isEmpty()) {
            schemaRegistryMapProperties.put("basic.auth.credentials.source", "USER_INFO");
            schemaRegistryMapProperties.put("basic.auth.user.info", this.tKafkaConfiguration().schemaRegistryUsername() + ":" + this.tKafkaConfiguration().schemaRegistryPassword());
        }
        if (Boolean.TRUE.equals(this.tKafkaConfiguration().isSchemaRegistryKeystoreSet())) {
            schemaRegistryMapProperties.put("schema.registry.ssl.truststore.type", this.tKafkaConfiguration().schemaRegistryTruststoreType());
            schemaRegistryMapProperties.put("schema.registry.ssl.truststore.location", this.tKafkaConfiguration().schemaRegistryTruststoreLocation());
            schemaRegistryMapProperties.put("schema.registry.ssl.truststore.password", this.tKafkaConfiguration().schemaRegistryTruststorePassword());
        }
        Map schemaRegistryMapPropertiesScala = this.convert(schemaRegistryMapProperties);
        FromAvroConfig fromAvroConfig = AbrisConfig.fromConfluentAvro().downloadReaderSchemaByLatestVersion().andTopicNameStrategy(this.topic(), false).usingSchemaRegistry(schemaRegistryMapPropertiesScala);
        Dataset rowDataset = dataStreamReader.load();
        return rowDataset.select(new Column[]{functions.from_avro((Column)org.apache.spark.sql.functions.col((String)VALUE), (FromAvroConfig)fromAvroConfig).as(VALUE)}).select("value.*", new String[0]);
    }

    private void setSASLOptions(DataStreamReader dataStreamReader) {
        if (this.tKafkaConfiguration().isUseSSLTLS()) {
            dataStreamReader.option("kafka.security.protocol", this.tKafkaConfiguration().securityProtocol());
            dataStreamReader.option("ssl.truststore.type", this.tKafkaConfiguration().sslTruststoreType());
            dataStreamReader.option("ssl.truststore.location", this.tKafkaConfiguration().sslTruststoreLocation());
            dataStreamReader.option("ssl.truststore.password", this.tKafkaConfiguration().sslTruststorePassword());
            TKafkaSLLKerberosUtils.setSASLTrustStoreConfiguration(this.tKafkaConfiguration(), this.sparkSession());
        }
    }

    private void setKerberosSecurityConfiguration(DataStreamReader dataStreamReader) {
        if (this.tKafkaConfiguration().isUseKerberos()) {
            dataStreamReader.option("sasl.kerberos.service.name", this.tKafkaConfiguration().kerberosServiceName());
            if (this.tKafkaConfiguration().isSetKinitCommandPath()) {
                dataStreamReader.option("sasl.kerberos.kinit.cmd", this.tKafkaConfiguration().kerberosKinitCommandPath());
            }
            TKafkaSLLKerberosUtils.setKerberosSecurityConfiguration(this.tKafkaConfiguration(), this.sparkSession());
        }
    }
}

