package org.apache.camel.component.tahu;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.tahu.handlers.TahuHostApplication;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.DefaultMessage;
import org.apache.camel.util.ObjectHelper;
import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
import org.eclipse.tahu.message.model.Message;
import org.eclipse.tahu.message.model.MessageType;
import org.eclipse.tahu.message.model.Metric;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;

/* loaded from: input_file:org/apache/camel/component/tahu/TahuHostConsumer.class */
public class TahuHostConsumer extends DefaultConsumer {
    private final TahuDefaultEndpoint endpoint;
    private final TahuHostApplication tahuHostApplication;
    private final Marker loggingMarker;
    private static final Logger LOG = LoggerFactory.getLogger(TahuHostConsumer.class);
    private static final ConcurrentMap<String, TahuHostApplication> hostHandlers = new ConcurrentHashMap();
    private static final List<MessageType> HANDLED_MESSAGE_TYPES = List.of(MessageType.NBIRTH, MessageType.NDATA, MessageType.NDEATH, MessageType.DBIRTH, MessageType.DDATA, MessageType.DDEATH);

    /* JADX INFO: Access modifiers changed from: package-private */
    public TahuHostConsumer(TahuDefaultEndpoint tahuDefaultEndpoint, Processor processor, String str) {
        super(tahuDefaultEndpoint, processor);
        this.endpoint = tahuDefaultEndpoint;
        this.loggingMarker = MarkerFactory.getMarker(str);
        TahuConfiguration configuration = tahuDefaultEndpoint.getConfiguration();
        this.tahuHostApplication = hostHandlers.computeIfAbsent(str, str2 -> {
            return new TahuHostApplication.HostApplicationBuilder().hostId(str2).serverDefinitions(configuration.getServerDefinitionList()).onMessageConsumer(this::onMessageConsumer).onMetricConsumer(this::onMetricConsumer).build();
        });
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.tahuHostApplication.startup();
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.tahuHostApplication.shutdown();
    }

    void onMessageConsumer(EdgeNodeDescriptor edgeNodeDescriptor, Message message) {
        Exchange exchange = null;
        try {
            try {
                Topic topic = message.getTopic();
                SparkplugBPayload payload = message.getPayload();
                if (HANDLED_MESSAGE_TYPES.contains(topic.getType())) {
                    exchange = createExchange(true);
                    CamelContext context = exchange.getContext();
                    org.apache.camel.Message message2 = (org.apache.camel.Message) ObjectHelper.supplyIfEmpty(exchange.getMessage(), () -> {
                        return new DefaultMessage(context);
                    });
                    exchange.setMessage(message2);
                    message2.setHeader(TahuConstants.MESSAGE_TYPE, topic.getType().name());
                    message2.setHeader(TahuConstants.EDGE_NODE_DESCRIPTOR, edgeNodeDescriptor.getDescriptorString());
                    if (payload.getTimestamp() != null) {
                        message2.setHeader(TahuConstants.MESSAGE_TIMESTAMP, Long.valueOf(payload.getTimestamp().getTime()));
                    }
                    if (payload.getSeq() != null) {
                        message2.setHeader(TahuConstants.MESSAGE_SEQUENCE_NUMBER, payload.getSeq());
                    }
                    if (payload.getUuid() != null) {
                        try {
                            message2.setHeader(TahuConstants.MESSAGE_UUID, UUID.fromString(payload.getUuid()));
                        } catch (IllegalArgumentException e) {
                            LOG.warn(this.loggingMarker, "Exception caught parsing Sparkplug message UUID {} - skipping", payload.getUuid());
                        }
                    }
                    if (payload.getBody() != null) {
                        message2.setBody(payload.getBody(), byte[].class);
                    }
                    Map map = (Map) payload.getMetrics().stream().map(metric -> {
                        return new Object[]{"CamelTahuMetric." + metric.getName(), metric};
                    }).collect(Collectors.toMap(objArr -> {
                        return (String) objArr[0];
                    }, objArr2 -> {
                        return objArr2[1];
                    }));
                    if (!map.isEmpty()) {
                        message2.setHeaders(map);
                    }
                    getProcessor().process(exchange);
                } else {
                    LOG.warn(this.loggingMarker, "TahuHostAppConsumer onMessageConsumer: Unknown Message Type {} from {} - ignoring", topic.getType(), edgeNodeDescriptor);
                }
                if (exchange == null || exchange.getException() == null) {
                    return;
                }
                getExceptionHandler().handleException("Exception caught processing exchange from Sparkplug Message", exchange, exchange.getException());
            } catch (Exception e2) {
                LOG.debug(this.loggingMarker, "Exception caught processing exchange from Sparkplug Message", e2);
                if (0 != 0) {
                    exchange.setException(e2);
                }
                if (0 == 0 || exchange.getException() == null) {
                    return;
                }
                getExceptionHandler().handleException("Exception caught processing exchange from Sparkplug Message", (Exchange) null, exchange.getException());
            }
        } catch (Throwable th) {
            if (0 != 0 && exchange.getException() != null) {
                getExceptionHandler().handleException("Exception caught processing exchange from Sparkplug Message", (Exchange) null, exchange.getException());
            }
            throw th;
        }
    }

    void onMetricConsumer(EdgeNodeDescriptor edgeNodeDescriptor, Metric metric) {
        Exchange exchange = null;
        try {
            try {
                exchange = createExchange(true);
                CamelContext context = exchange.getContext();
                org.apache.camel.Message message = (org.apache.camel.Message) ObjectHelper.supplyIfEmpty(exchange.getMessage(), () -> {
                    return new DefaultMessage(context);
                });
                exchange.setMessage(message);
                message.setHeader(TahuConstants.EDGE_NODE_DESCRIPTOR, edgeNodeDescriptor.getDescriptorString());
                message.setHeader("CamelTahuMetric." + metric.getName(), metric);
                getProcessor().process(exchange);
                if (exchange == null || exchange.getException() == null) {
                    return;
                }
                getExceptionHandler().handleException("Exception caught processing exchange from Sparkplug Metric", exchange, exchange.getException());
            } catch (Exception e) {
                LOG.debug(this.loggingMarker, "Exception caught processing exchange from Sparkplug Metric", e);
                if (exchange != null) {
                    exchange.setException(e);
                }
                if (exchange == null || exchange.getException() == null) {
                    return;
                }
                getExceptionHandler().handleException("Exception caught processing exchange from Sparkplug Metric", exchange, exchange.getException());
            }
        } catch (Throwable th) {
            if (exchange != null && exchange.getException() != null) {
                getExceptionHandler().handleException("Exception caught processing exchange from Sparkplug Metric", exchange, exchange.getException());
            }
            throw th;
        }
    }
}
