/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.es;

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.AcknowledgedResponseBase;
import co.elastic.clients.elasticsearch._types.WriteResponseBase;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.DeleteRequest;
import co.elastic.clients.elasticsearch.core.GetRequest;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.MgetRequest;
import co.elastic.clients.elasticsearch.core.MgetResponse;
import co.elastic.clients.elasticsearch.core.MsearchRequest;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.UpdateRequest;
import co.elastic.clients.elasticsearch.core.UpdateResponse;
import co.elastic.clients.elasticsearch.core.msearch.MultiSearchResult;
import co.elastic.clients.elasticsearch.core.search.ResponseBody;
import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest;
import co.elastic.clients.elasticsearch.indices.ExistsRequest;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.concurrent.CompletableFuture;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.es.ElasticsearchConfiguration;
import org.apache.camel.component.es.ElasticsearchEndpoint;
import org.apache.camel.component.es.ElasticsearchOperation;
import org.apache.camel.component.es.ElasticsearchScrollRequestIterator;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.IOHelper;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.client.sniff.SnifferBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ElasticsearchProducer
extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchProducer.class);
    protected final ElasticsearchConfiguration configuration;
    private final Object mutex = new Object();
    private volatile RestClient client;
    private Sniffer sniffer;

    public ElasticsearchProducer(ElasticsearchEndpoint endpoint, ElasticsearchConfiguration configuration) {
        super(endpoint);
        this.configuration = configuration;
        this.client = endpoint.getClient();
    }

    private ElasticsearchOperation resolveOperation(Exchange exchange) {
        Object request = exchange.getIn().getBody();
        if (request instanceof IndexRequest) {
            return ElasticsearchOperation.Index;
        }
        if (request instanceof GetRequest) {
            return ElasticsearchOperation.GetById;
        }
        if (request instanceof MgetRequest) {
            return ElasticsearchOperation.MultiGet;
        }
        if (request instanceof UpdateRequest) {
            return ElasticsearchOperation.Update;
        }
        if (request instanceof BulkRequest) {
            return ElasticsearchOperation.Bulk;
        }
        if (request instanceof DeleteRequest) {
            return ElasticsearchOperation.Delete;
        }
        if (request instanceof SearchRequest) {
            return ElasticsearchOperation.Search;
        }
        if (request instanceof MsearchRequest) {
            return ElasticsearchOperation.MultiSearch;
        }
        if (request instanceof DeleteIndexRequest) {
            return ElasticsearchOperation.DeleteIndex;
        }
        ElasticsearchOperation operationConfig = exchange.getIn().getHeader("operation", ElasticsearchOperation.class);
        if (operationConfig == null) {
            operationConfig = this.configuration.getOperation();
        }
        if (operationConfig == null) {
            throw new IllegalArgumentException("operation value is mandatory");
        }
        return operationConfig;
    }

    @Override
    public boolean process(Exchange exchange, AsyncCallback callback) {
        try {
            Class<?> documentClass;
            Integer from;
            Integer size;
            if (this.configuration.isDisconnect() && this.client == null) {
                this.startClient();
            }
            ObjectMapper mapper = new ObjectMapper();
            mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
            RestClientTransport transport = new RestClientTransport(this.client, new JacksonJsonpMapper(mapper));
            Message message = exchange.getIn();
            ElasticsearchOperation operation = this.resolveOperation(exchange);
            boolean configIndexName = false;
            String indexName = message.getHeader("indexName", String.class);
            if (indexName == null) {
                message.setHeader("indexName", this.configuration.getIndexName());
                configIndexName = true;
            }
            if ((size = message.getHeader("size", Integer.class)) == null) {
                message.setHeader("size", this.configuration.getSize());
            }
            if ((from = message.getHeader("from", Integer.class)) == null) {
                message.setHeader("from", this.configuration.getFrom());
            }
            boolean configWaitForActiveShards = false;
            Integer waitForActiveShards = message.getHeader("waitForActiveShards", Integer.class);
            if (waitForActiveShards == null) {
                message.setHeader("waitForActiveShards", this.configuration.getWaitForActiveShards());
                configWaitForActiveShards = true;
            }
            if ((documentClass = message.getHeader("documentClass", Class.class)) == null) {
                documentClass = this.configuration.getDocumentClass();
            }
            ActionContext ctx = new ActionContext(exchange, callback, transport, configIndexName, configWaitForActiveShards);
            switch (operation) {
                case Index: {
                    this.processIndexAsync(ctx);
                    break;
                }
                case Update: {
                    this.processUpdateAsync(ctx, documentClass);
                    break;
                }
                case GetById: {
                    this.processGetByIdAsync(ctx, documentClass);
                    break;
                }
                case Bulk: {
                    this.processBulkAsync(ctx);
                    break;
                }
                case Delete: {
                    this.processDeleteAsync(ctx);
                    break;
                }
                case DeleteIndex: {
                    this.processDeleteIndexAsync(ctx);
                    break;
                }
                case Exists: {
                    this.processExistsAsync(ctx);
                    break;
                }
                case Search: {
                    SearchRequest.Builder searchRequestBuilder = message.getBody(SearchRequest.Builder.class);
                    if (searchRequestBuilder == null) {
                        throw new IllegalArgumentException("Wrong body type. Only Map, String or SearchRequest.Builder is allowed as a type");
                    }
                    boolean useScroll = message.getHeader("useScroll", this.configuration.isUseScroll(), Boolean.class);
                    if (useScroll) {
                        int scrollKeepAliveMs = message.getHeader("scrollKeepAliveMs", this.configuration.getScrollKeepAliveMs(), Integer.class);
                        ElasticsearchScrollRequestIterator scrollRequestIterator = new ElasticsearchScrollRequestIterator(searchRequestBuilder, new ElasticsearchClient(transport), scrollKeepAliveMs, exchange, documentClass);
                        exchange.getIn().setBody(scrollRequestIterator);
                        this.cleanup(ctx);
                        callback.done(true);
                        return true;
                    }
                    this.onComplete((CompletableFuture)new ElasticsearchAsyncClient(transport).search(searchRequestBuilder.build(), documentClass).thenApply(ResponseBody::hits), ctx);
                    break;
                }
                case MultiSearch: {
                    this.processMultiSearchAsync(ctx, documentClass);
                    break;
                }
                case MultiGet: {
                    this.processMultiGetAsync(ctx, documentClass);
                    break;
                }
                case Ping: {
                    this.processPingAsync(ctx);
                    break;
                }
                default: {
                    throw new IllegalArgumentException("operation value '" + operation + "' is not supported");
                }
            }
        }
        catch (Exception e) {
            exchange.setException(e);
            callback.done(true);
            return true;
        }
        return false;
    }

    private void processPingAsync(ActionContext ctx) {
        this.onComplete((CompletableFuture)ctx.getClient().ping().thenApply(BooleanResponse::value), ctx);
    }

    private void processMultiGetAsync(ActionContext ctx, Class<?> documentClass) {
        MgetRequest.Builder mgetRequestBuilder = ctx.getMessage().getBody(MgetRequest.Builder.class);
        if (mgetRequestBuilder == null) {
            throw new IllegalArgumentException("Wrong body type. Only MgetRequest.Builder is allowed as a type");
        }
        this.onComplete((CompletableFuture)ctx.getClient().mget(mgetRequestBuilder.build(), documentClass).thenApply(MgetResponse::docs), ctx);
    }

    private void processMultiSearchAsync(ActionContext ctx, Class<?> documentClass) {
        MsearchRequest.Builder msearchRequestBuilder = ctx.getMessage().getBody(MsearchRequest.Builder.class);
        if (msearchRequestBuilder == null) {
            throw new IllegalArgumentException("Wrong body type. Only MsearchRequest.Builder is allowed as a type");
        }
        this.onComplete((CompletableFuture)ctx.getClient().msearch(msearchRequestBuilder.build(), documentClass).thenApply(MultiSearchResult::responses), ctx);
    }

    private void processExistsAsync(ActionContext ctx) {
        ExistsRequest.Builder builder = new ExistsRequest.Builder();
        builder.index(ctx.getMessage().getHeader("indexName", String.class), new String[0]);
        this.onComplete((CompletableFuture)ctx.getClient().indices().exists(builder.build()).thenApply(BooleanResponse::value), ctx);
    }

    private void processDeleteIndexAsync(ActionContext ctx) {
        DeleteIndexRequest.Builder deleteIndexRequestBuilder = ctx.getMessage().getBody(DeleteIndexRequest.Builder.class);
        if (deleteIndexRequestBuilder == null) {
            throw new IllegalArgumentException("Wrong body type. Only String or DeleteIndexRequest.Builder is allowed as a type");
        }
        this.onComplete((CompletableFuture)ctx.getClient().indices().delete(deleteIndexRequestBuilder.build()).thenApply(AcknowledgedResponseBase::acknowledged), ctx);
    }

    private void processDeleteAsync(ActionContext ctx) {
        DeleteRequest.Builder deleteRequestBuilder = ctx.getMessage().getBody(DeleteRequest.Builder.class);
        if (deleteRequestBuilder == null) {
            throw new IllegalArgumentException("Wrong body type. Only String or DeleteRequest.Builder is allowed as a type");
        }
        this.onComplete((CompletableFuture)ctx.getClient().delete(deleteRequestBuilder.build()).thenApply(WriteResponseBase::result), ctx);
    }

    private void processBulkAsync(ActionContext ctx) {
        BulkRequest.Builder bulkRequestBuilder = ctx.getMessage().getBody(BulkRequest.Builder.class);
        if (bulkRequestBuilder == null) {
            throw new IllegalArgumentException("Wrong body type. Only Iterable or BulkRequest.Builder is allowed as a type");
        }
        this.onComplete((CompletableFuture)ctx.getClient().bulk(bulkRequestBuilder.build()).thenApply(BulkResponse::items), ctx);
    }

    private void processGetByIdAsync(ActionContext ctx, Class<?> documentClass) {
        GetRequest.Builder getRequestBuilder = ctx.getMessage().getBody(GetRequest.Builder.class);
        if (getRequestBuilder == null) {
            throw new IllegalArgumentException("Wrong body type. Only String or GetRequest.Builder is allowed as a type");
        }
        this.onComplete(ctx.getClient().get(getRequestBuilder.build(), documentClass), ctx);
    }

    private void processUpdateAsync(ActionContext ctx, Class<?> documentClass) {
        UpdateRequest.Builder updateRequestBuilder = ctx.getMessage().getBody(UpdateRequest.Builder.class);
        this.onComplete((CompletableFuture)ctx.getClient().update(updateRequestBuilder.build(), documentClass).thenApply(r -> ((UpdateResponse)r).id()), ctx);
    }

    private void processIndexAsync(ActionContext ctx) {
        IndexRequest.Builder indexRequestBuilder = ctx.getMessage().getBody(IndexRequest.Builder.class);
        this.onComplete((CompletableFuture)ctx.getClient().index(indexRequestBuilder.build()).thenApply(WriteResponseBase::id), ctx);
    }

    private <T> void onComplete(CompletableFuture<T> future, ActionContext ctx) {
        Exchange exchange = ctx.getExchange();
        ((CompletableFuture)((CompletableFuture)future.thenAccept(r -> exchange.getIn().setBody(r))).thenAccept(r -> this.cleanup(ctx))).whenComplete((r, e) -> {
            try {
                if (e != null) {
                    exchange.setException(new CamelExchangeException("An error occurred while executing the action", exchange, (Throwable)e));
                }
            }
            finally {
                ctx.getCallback().done(false);
            }
        });
    }

    private void cleanup(ActionContext ctx) {
        try {
            Message message = ctx.getMessage();
            if (ctx.isConfigIndexName()) {
                message.removeHeader("indexName");
            }
            if (ctx.isConfigWaitForActiveShards()) {
                message.removeHeader("waitForActiveShards");
            }
            if (this.configuration.isDisconnect()) {
                IOHelper.close((Closeable)ctx.getTransport());
                if (this.configuration.isEnableSniffer()) {
                    IOHelper.close((Closeable)this.sniffer);
                    this.sniffer = null;
                }
                IOHelper.close((Closeable)this.client);
                this.client = null;
            }
        }
        catch (Exception e) {
            LOG.warn("Could not execute the cleanup task", (Throwable)e);
        }
    }

    @Override
    protected void doStart() throws Exception {
        super.doStart();
        if (!this.configuration.isDisconnect()) {
            this.startClient();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startClient() {
        if (this.client == null) {
            Object object = this.mutex;
            synchronized (object) {
                if (this.client == null) {
                    LOG.info("Connecting to the ElasticSearch cluster: {}", (Object)this.configuration.getClusterName());
                    if (this.configuration.getHostAddressesList() != null && !this.configuration.getHostAddressesList().isEmpty()) {
                        this.client = this.createClient();
                    } else {
                        LOG.warn("Incorrect ip address and port parameters settings for ElasticSearch cluster");
                    }
                }
            }
        }
    }

    private RestClient createClient() {
        RestClientBuilder builder = RestClient.builder(this.configuration.getHostAddressesList().toArray(new HttpHost[0]));
        builder.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setConnectTimeout(this.configuration.getConnectionTimeout()).setSocketTimeout(this.configuration.getSocketTimeout()));
        if (this.configuration.getUser() != null && this.configuration.getPassword() != null) {
            BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(this.configuration.getUser(), this.configuration.getPassword()));
            builder.setHttpClientConfigCallback(httpClientBuilder -> {
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                if (this.configuration.getCertificatePath() != null) {
                    httpClientBuilder.setSSLContext(this.createSslContextFromCa());
                }
                return httpClientBuilder;
            });
        }
        RestClient restClient = builder.build();
        if (this.configuration.isEnableSniffer()) {
            SnifferBuilder snifferBuilder = Sniffer.builder(restClient);
            snifferBuilder.setSniffIntervalMillis(this.configuration.getSnifferInterval());
            snifferBuilder.setSniffAfterFailureDelayMillis(this.configuration.getSniffAfterFailureDelay());
            this.sniffer = snifferBuilder.build();
        }
        return restClient;
    }

    @Override
    protected void doStop() throws Exception {
        if (this.client != null) {
            LOG.info("Disconnecting from ElasticSearch cluster: {}", (Object)this.configuration.getClusterName());
            this.client.close();
            if (this.sniffer != null) {
                this.sniffer.close();
            }
        }
        super.doStop();
    }

    public RestClient getClient() {
        return this.client;
    }

    private SSLContext createSslContextFromCa() {
        try {
            CertificateFactory factory = CertificateFactory.getInstance("X.509");
            Certificate trustedCa = factory.generateCertificate(new ByteArrayInputStream(Files.readAllBytes(Paths.get(this.configuration.getCertificatePath(), new String[0]))));
            KeyStore trustStore = KeyStore.getInstance("pkcs12");
            trustStore.load(null, null);
            trustStore.setCertificateEntry("ca", trustedCa);
            SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
            TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
            trustManagerFactory.init(trustStore);
            sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
            return sslContext;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static class ActionContext {
        private final Exchange exchange;
        private final AsyncCallback callback;
        private final ElasticsearchTransport transport;
        private final boolean configIndexName;
        private final boolean configWaitForActiveShards;

        ActionContext(Exchange exchange, AsyncCallback callback, ElasticsearchTransport transport, boolean configIndexName, boolean configWaitForActiveShards) {
            this.exchange = exchange;
            this.callback = callback;
            this.transport = transport;
            this.configIndexName = configIndexName;
            this.configWaitForActiveShards = configWaitForActiveShards;
        }

        ElasticsearchTransport getTransport() {
            return this.transport;
        }

        ElasticsearchAsyncClient getClient() {
            return new ElasticsearchAsyncClient(this.transport);
        }

        boolean isConfigIndexName() {
            return this.configIndexName;
        }

        boolean isConfigWaitForActiveShards() {
            return this.configWaitForActiveShards;
        }

        Exchange getExchange() {
            return this.exchange;
        }

        AsyncCallback getCallback() {
            return this.callback;
        }

        Message getMessage() {
            return this.exchange.getIn();
        }
    }
}

