/*
 * Decompiled with CFR 0.152.
 */
package org.talend.bigdata.structuredstreaming.kafka;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.immutables.value.Value;
import org.talend.bigdata.common.Component;
import org.talend.bigdata.structuredstreaming.kafka.TKafkaConfiguration;
import org.talend.bigdata.structuredstreaming.kafka.TKafkaSLLKerberosUtils;

@Value.Enclosing
public interface TKafkaOutput
extends Component {

    @Value.Immutable
    public static abstract class Dataframe
    implements WithComponentConfiguration,
    Serializable {
        public abstract Dataset<Row> inputDataset();

        public abstract TKafkaConfiguration tKafkaConfiguration();

        public abstract SparkSession sparkSession();

        @Value.Derived
        public void write() throws TimeoutException, StreamingQueryException {
            DataStreamWriter dataStreamWriter = this.inputDataset().map((MapFunction & Serializable)inputRecord -> {
                byte[] value = (byte[])inputRecord.getAs("serializedValue");
                if (value == null) {
                    value = new byte[]{};
                }
                return value;
            }, Encoders.BINARY()).writeStream().format("kafka").option("kafka.bootstrap.servers", this.tKafkaConfiguration().brokerList()).option("topic", this.topic());
            if (this.checkpointLocationOptional().isPresent()) {
                dataStreamWriter.option("checkpointLocation", this.checkpointLocationOptional().get());
            }
            this.setSASLOptions(dataStreamWriter);
            this.setKerberosSecurityConfiguration(dataStreamWriter);
            dataStreamWriter.start().awaitTermination();
        }

        private void setSASLOptions(DataStreamWriter dataStreamWriter) {
            if (this.tKafkaConfiguration().isUseSSLTLS()) {
                dataStreamWriter.option("kafka.security.protocol", this.tKafkaConfiguration().securityProtocol());
                dataStreamWriter.option("ssl.truststore.type", this.tKafkaConfiguration().sslTruststoreType());
                dataStreamWriter.option("ssl.truststore.location", this.tKafkaConfiguration().sslTruststoreLocation());
                dataStreamWriter.option("ssl.truststore.password", this.tKafkaConfiguration().sslTruststorePassword());
                TKafkaSLLKerberosUtils.setSASLTrustStoreConfiguration(this.tKafkaConfiguration(), this.sparkSession());
            }
        }

        private void setKerberosSecurityConfiguration(DataStreamWriter dataStreamWriter) {
            if (this.tKafkaConfiguration().isUseKerberos()) {
                dataStreamWriter.option("sasl.kerberos.service.name", this.tKafkaConfiguration().kerberosServiceName());
                if (this.tKafkaConfiguration().isSetKinitCommandPath()) {
                    dataStreamWriter.option("sasl.kerberos.kinit.cmd", this.tKafkaConfiguration().kerberosKinitCommandPath());
                }
                TKafkaSLLKerberosUtils.setKerberosSecurityConfiguration(this.tKafkaConfiguration(), this.sparkSession());
            }
        }
    }

    public static interface WithComponentConfiguration
    extends WithAdvanceComponentConfiguration {
        @Value.Default
        default public String topic() {
            return "";
        }

        @Value.Default
        default public int partition() {
            return 0;
        }

        @Value.Default
        default public String key() {
            return "";
        }

        @Value.Default
        default public boolean isCompressDataSelected() {
            return false;
        }

        @Value.Default
        default public Compression compression() {
            return Compression.GZIP;
        }

        @Value.Default
        default public Optional<String> checkpointLocationOptional() {
            return Optional.empty();
        }
    }

    public static interface WithAdvanceComponentConfiguration {
        @Value.Default
        default public Map<String, String> kafKaPropertiesMap() {
            return new HashMap<String, String>();
        }
    }

    public static enum Compression {
        GZIP,
        SNAPPY;

    }
}

