package org.apache.camel.component.stitch.operations;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.stitch.StitchConfiguration;
import org.apache.camel.component.stitch.StitchConstants;
import org.apache.camel.component.stitch.client.StitchClient;
import org.apache.camel.component.stitch.client.models.StitchMessage;
import org.apache.camel.component.stitch.client.models.StitchRequestBody;
import org.apache.camel.component.stitch.client.models.StitchResponse;
import org.apache.camel.component.stitch.client.models.StitchSchema;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/apache/camel/component/stitch/operations/StitchProducerOperations.class */
public class StitchProducerOperations {
    private static final Logger LOG = LoggerFactory.getLogger(StitchProducerOperations.class);
    private final StitchClient client;
    private final StitchConfiguration configuration;

    public StitchProducerOperations(StitchClient stitchClient, StitchConfiguration stitchConfiguration) {
        ObjectHelper.notNull(stitchClient, "client");
        ObjectHelper.notNull(stitchConfiguration, "configuration");
        this.client = stitchClient;
        this.configuration = stitchConfiguration;
    }

    public boolean sendEvents(Message message, Consumer<StitchResponse> consumer, AsyncCallback asyncCallback) {
        sendAsyncEvents(message).subscribe(consumer, th -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Error processing async exchange with error: {}", th.getMessage());
            }
            message.getExchange().setException(th);
            asyncCallback.done(false);
        }, () -> {
            LOG.trace("All events with exchange have been sent successfully.");
            asyncCallback.done(false);
        });
        return false;
    }

    private Mono<StitchResponse> sendAsyncEvents(Message message) {
        return this.client.batch(createStitchRequestBody(message));
    }

    public StitchRequestBody createStitchRequestBody(Message message) {
        if (message.getBody() instanceof StitchRequestBody) {
            return createStitchRequestBodyFromStitchRequestBody((StitchRequestBody) message.getBody(StitchRequestBody.class), message);
        }
        if (message.getBody() instanceof StitchMessage) {
            return createStitchRequestBodyFromStitchMessages(Collections.singletonList(message.getBody(StitchMessage.class)), message);
        }
        if (message.getBody() instanceof Iterable) {
            return createStitchRequestBodyFromIterable((Iterable) message.getBody(Iterable.class), message);
        }
        if (message.getBody() instanceof Map) {
            return createStitchRecordFromMap((Map) message.getBody(Map.class), message);
        }
        throw new IllegalArgumentException("Message body data `" + message.getBody() + "` type is not supported");
    }

    private StitchRequestBody createStitchRequestBodyFromStitchRequestBody(StitchRequestBody stitchRequestBody, Message message) {
        return createStitchRecordFromBuilder(StitchRequestBody.fromStitchRequestBody(stitchRequestBody), message);
    }

    private StitchRequestBody createStitchRequestBodyFromStitchMessages(Collection<StitchMessage> collection, Message message) {
        return createStitchRecordFromBuilder(StitchRequestBody.builder().addMessages(collection), message);
    }

    private StitchRequestBody createStitchRequestBodyFromIterable(Iterable<Object> iterable, Message message) {
        LinkedList linkedList = new LinkedList();
        iterable.forEach(obj -> {
            if (obj instanceof StitchMessage) {
                linkedList.add((StitchMessage) obj);
                return;
            }
            if (obj instanceof Map) {
                linkedList.add(StitchMessage.fromMap((Map) ObjectHelper.cast(Map.class, obj)).build());
                return;
            }
            if (obj instanceof StitchRequestBody) {
                linkedList.addAll(((StitchRequestBody) obj).getMessages());
                return;
            }
            if (obj instanceof Message) {
                Message message2 = (Message) obj;
                message2.setHeaders(message.getHeaders());
                linkedList.addAll(createStitchRequestBody(message2).getMessages());
            } else {
                if (!(obj instanceof Exchange)) {
                    throw new IllegalArgumentException("Input data `" + obj + "` type is not supported");
                }
                Message message3 = ((Exchange) obj).getMessage();
                message3.setHeaders(message.getHeaders());
                linkedList.addAll(createStitchRequestBody(message3).getMessages());
            }
        });
        return createStitchRequestBodyFromStitchMessages(linkedList, message);
    }

    private StitchRequestBody createStitchRecordFromMap(Map<String, Object> map, Message message) {
        return createStitchRecordFromBuilder(StitchRequestBody.fromMap(map), message);
    }

    private StitchRequestBody createStitchRecordFromBuilder(StitchRequestBody.Builder builder, Message message) {
        return builder.withSchema(getStitchSchema(message)).withTableName(getTableName(message)).withKeyNames(getKeyNames(message)).build();
    }

    private String getTableName(Message message) {
        StitchConfiguration stitchConfiguration = this.configuration;
        stitchConfiguration.getClass();
        return (String) getOption(message, StitchConstants.TABLE_NAME, stitchConfiguration::getTableName, String.class);
    }

    private StitchSchema getStitchSchema(Message message) {
        if (ObjectHelper.isNotEmpty(message.getHeader(StitchConstants.SCHEMA))) {
            if (message.getHeader(StitchConstants.SCHEMA) instanceof StitchSchema) {
                return (StitchSchema) message.getHeader(StitchConstants.SCHEMA, StitchSchema.class);
            }
            if (message.getHeader(StitchConstants.SCHEMA) instanceof Map) {
                return StitchSchema.builder().addKeywords((Map) message.getHeader(StitchConstants.SCHEMA, Map.class)).build();
            }
        }
        return this.configuration.getStitchSchema();
    }

    private Collection<String> getKeyNames(Message message) {
        StitchConfiguration stitchConfiguration = this.configuration;
        stitchConfiguration.getClass();
        String str = (String) getOption(message, StitchConstants.KEY_NAMES, stitchConfiguration::getKeyNames, String.class);
        return ObjectHelper.isNotEmpty(str) ? Arrays.asList((Object[]) str.split(",").clone()) : Collections.emptyList();
    }

    private <R> R getOption(Message message, String str, Supplier<R> supplier, Class<R> cls) {
        return (ObjectHelper.isEmpty(message) || ObjectHelper.isEmpty(getObjectFromHeaders(message, str, cls))) ? supplier.get() : (R) getObjectFromHeaders(message, str, cls);
    }

    private <T> T getObjectFromHeaders(Message message, String str, Class<T> cls) {
        return (T) message.getHeader(str, (Class) cls);
    }
}
