package org.eclipse.tahu.host;

import java.util.Date;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.saga.InMemorySagaService;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.tahu.exception.TahuErrorCode;
import org.eclipse.tahu.exception.TahuException;
import org.eclipse.tahu.host.api.HostApplicationEventHandler;
import org.eclipse.tahu.host.manager.EdgeNodeManager;
import org.eclipse.tahu.host.manager.MetricManager;
import org.eclipse.tahu.host.manager.SparkplugDevice;
import org.eclipse.tahu.host.manager.SparkplugEdgeNode;
import org.eclipse.tahu.host.model.HostApplicationMetricMap;
import org.eclipse.tahu.host.model.HostMetric;
import org.eclipse.tahu.host.model.MessageContext;
import org.eclipse.tahu.message.PayloadDecoder;
import org.eclipse.tahu.message.model.DeviceDescriptor;
import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
import org.eclipse.tahu.message.model.MessageType;
import org.eclipse.tahu.message.model.Metric;
import org.eclipse.tahu.message.model.MetricDataType;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.SparkplugDescriptor;
import org.eclipse.tahu.message.model.SparkplugMeta;
import org.eclipse.tahu.message.model.Topic;
import org.eclipse.tahu.mqtt.MqttClientId;
import org.eclipse.tahu.mqtt.MqttServerName;
import org.eclipse.tahu.util.SparkplugUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/tahu/host/TahuPayloadHandler.class */
public class TahuPayloadHandler {
    private static Logger logger = LoggerFactory.getLogger(TahuPayloadHandler.class.getName());
    private static Map<EdgeNodeDescriptor, Timer> rebirthTimers = new ConcurrentHashMap();
    private final HostApplicationEventHandler eventHandler;
    private final CommandPublisher commandPublisher;
    private final PayloadDecoder<SparkplugBPayload> payloadDecoder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/tahu/host/TahuPayloadHandler$RebirthDelayTask.class */
    public class RebirthDelayTask extends TimerTask {
        private EdgeNodeDescriptor edgeNodeDescriptor;

        public RebirthDelayTask(EdgeNodeDescriptor edgeNodeDescriptor) {
            this.edgeNodeDescriptor = edgeNodeDescriptor;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            if (TahuPayloadHandler.rebirthTimers.get(this.edgeNodeDescriptor) != null) {
                ((Timer) TahuPayloadHandler.rebirthTimers.get(this.edgeNodeDescriptor)).cancel();
                TahuPayloadHandler.rebirthTimers.remove(this.edgeNodeDescriptor);
            }
        }
    }

    public TahuPayloadHandler(HostApplicationEventHandler hostApplicationEventHandler, CommandPublisher commandPublisher, PayloadDecoder<SparkplugBPayload> payloadDecoder) {
        this.eventHandler = hostApplicationEventHandler;
        this.commandPublisher = commandPublisher;
        this.payloadDecoder = payloadDecoder;
    }

    public void handlePayload(String str, String[] strArr, MqttMessage mqttMessage, MqttServerName mqttServerName, MqttClientId mqttClientId) {
        Topic topic;
        logger.trace("Handling payload on {}", str);
        try {
            if (strArr.length == 4) {
                topic = new Topic(strArr[0], strArr[1], strArr[3], MessageType.valueOf(strArr[2]));
            } else {
                if (strArr.length != 5) {
                    logger.error("Failed to handle the topic '{}'", str);
                    return;
                }
                topic = new Topic(strArr[0], strArr[1], strArr[3], strArr[4], MessageType.valueOf(strArr[2]));
            }
            MessageType type = topic.getType();
            try {
                SparkplugBPayload buildFromByteArray = this.payloadDecoder.buildFromByteArray(mqttMessage.getPayload(), HostApplicationMetricMap.getInstance().getMetricDataTypeMap(topic.getEdgeNodeDescriptor(), topic.getSparkplugDescriptor()));
                logger.trace("On topic={}: Incoming payload: {}", topic, buildFromByteArray);
                if (type.isCommand()) {
                    logger.debug("Ignoring outbound command: {}", str);
                    return;
                }
                Long l = null;
                if (!type.equals(MessageType.NDEATH)) {
                    if (buildFromByteArray == null || buildFromByteArray.getSeq() == null) {
                        logger.error("Invalid payload with topic={}: {}", str, buildFromByteArray == null ? "payload is null" : "sequence number is null");
                        return;
                    }
                    l = buildFromByteArray.getSeq();
                    if (l == null) {
                        logger.error("Invalid payload missing sequence number: {}", str);
                        return;
                    }
                }
                try {
                    MessageContext messageContext = new MessageContext(mqttServerName, mqttClientId, topic, buildFromByteArray, mqttMessage.getPayload() == null ? 0 : mqttMessage.getPayload().length, l == null ? -1L : l.longValue());
                    switch (type) {
                        case NBIRTH:
                            logger.info("Handling NBIRTH from {}", topic.getSparkplugDescriptor());
                            handleNodeBirth(messageContext);
                            break;
                        case DBIRTH:
                            logger.info("Handling DBIRTH from {}", topic.getSparkplugDescriptor());
                            handleDeviceBirth(messageContext);
                            break;
                        case NDATA:
                            logger.info("Handling NDATA from {}", topic.getSparkplugDescriptor());
                            handleNodeData(messageContext);
                            break;
                        case DDATA:
                            logger.info("Handling DDATA from {}", topic.getSparkplugDescriptor());
                            handleDeviceData(messageContext);
                            break;
                        case NDEATH:
                            logger.info("Handling NDEATH from {}", topic.getSparkplugDescriptor());
                            handleNodeDeath(messageContext);
                            break;
                        case DDEATH:
                            logger.info("Handling DDEATH from {}", topic.getSparkplugDescriptor());
                            handleDeviceDeath(messageContext);
                            break;
                        default:
                            logger.info("Unknown message with type={} on topic={}", type, topic);
                            break;
                    }
                } catch (Exception e) {
                    logger.error("Failed to handle payload on topic: {} with payload={}", new Object[]{topic, buildFromByteArray, e});
                }
            } catch (Exception e2) {
                logger.error("Failed to decode the payload", e2);
            }
        } catch (Exception e3) {
            logger.error("Error parsing topic", e3);
        }
    }

    protected void handleNodeBirth(MessageContext messageContext) throws Exception {
        logger.debug("Processing NBIRTH from Edge Node {} with Seq# {}", messageContext.getTopic().getEdgeNodeDescriptor(), Long.valueOf(messageContext.getSeqNum()));
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(messageContext.getTopic().getEdgeNodeDescriptor());
        if (sparkplugEdgeNode == null) {
            sparkplugEdgeNode = EdgeNodeManager.getInstance().addSparkplugEdgeNode(edgeNodeDescriptor, messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId());
        } else {
            sparkplugEdgeNode.clearMetrics();
        }
        HostApplicationMetricMap hostApplicationMetricMap = HostApplicationMetricMap.getInstance();
        hostApplicationMetricMap.clear(sparkplugEdgeNode.getEdgeNodeDescriptor());
        sparkplugEdgeNode.setOnline(true, messageContext.getPayload().getTimestamp(), SparkplugUtil.getBdSequenceNumber(messageContext.getPayload()), Long.valueOf(messageContext.getSeqNum()));
        this.eventHandler.onNodeBirthArrived(edgeNodeDescriptor, messageContext.getMessage());
        this.eventHandler.onMessage(edgeNodeDescriptor, messageContext.getMessage());
        for (Metric metric : messageContext.getPayload().getMetrics()) {
            if (metric.hasAlias() && hostApplicationMetricMap.aliasExists(edgeNodeDescriptor, messageContext.getTopic().getSparkplugDescriptor(), metric.getAlias().longValue())) {
                String str = "Not adding duplicated alias for edgeNode=" + edgeNodeDescriptor + " - alias=" + metric.getAlias() + " and metric name=" + metric.getName() + " - with existing alias for " + hostApplicationMetricMap.getMetricName(edgeNodeDescriptor, messageContext.getTopic().getSparkplugDescriptor(), metric.getAlias().longValue());
                logger.error(str);
                requestRebirth(messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId(), messageContext.getTopic().getEdgeNodeDescriptor());
                throw new TahuException(TahuErrorCode.INVALID_ARGUMENT, str);
            }
            hostApplicationMetricMap.addMetric(edgeNodeDescriptor, edgeNodeDescriptor, metric.getName(), metric);
            sparkplugEdgeNode.putMetric(metric.getName(), new HostMetric(metric, false));
            this.eventHandler.onBirthMetric(edgeNodeDescriptor, metric);
        }
        this.eventHandler.onNodeBirthComplete(edgeNodeDescriptor);
    }

    protected void handleDeviceBirth(MessageContext messageContext) throws Exception {
        logger.debug("Processing DBIRTH from Device {} with Seq# {}", messageContext.getTopic().getSparkplugDescriptor(), Long.valueOf(messageContext.getSeqNum()));
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        DeviceDescriptor deviceDescriptor = (DeviceDescriptor) messageContext.getTopic().getSparkplugDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(edgeNodeDescriptor);
        SparkplugDevice sparkplugDevice = EdgeNodeManager.getInstance().getSparkplugDevice(edgeNodeDescriptor, deviceDescriptor);
        if (sparkplugDevice == null) {
            sparkplugDevice = EdgeNodeManager.getInstance().addSparkplugDevice(edgeNodeDescriptor, deviceDescriptor, messageContext.getPayload().getTimestamp());
        } else {
            sparkplugDevice.clearMetrics();
        }
        sparkplugEdgeNode.handleSeq(messageContext.getPayload().getSeq());
        sparkplugDevice.setOnline(true, messageContext.getPayload().getTimestamp());
        this.eventHandler.onDeviceBirthArrived(deviceDescriptor, messageContext.getMessage());
        this.eventHandler.onMessage(deviceDescriptor, messageContext.getMessage());
        HostApplicationMetricMap hostApplicationMetricMap = HostApplicationMetricMap.getInstance();
        for (Metric metric : messageContext.getPayload().getMetrics()) {
            if (metric.hasAlias() && hostApplicationMetricMap.aliasExists(edgeNodeDescriptor, deviceDescriptor, metric.getAlias().longValue())) {
                String str = "Not adding duplicated alias for device=" + deviceDescriptor + " - alias=" + metric.getAlias() + " and metric name=" + metric.getName() + " - with existing alias for " + hostApplicationMetricMap.getMetricName(edgeNodeDescriptor, deviceDescriptor, metric.getAlias().longValue());
                logger.error(str);
                requestRebirth(messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId(), messageContext.getTopic().getEdgeNodeDescriptor());
                throw new TahuException(TahuErrorCode.INVALID_ARGUMENT, str);
            }
            hostApplicationMetricMap.addMetric(edgeNodeDescriptor, deviceDescriptor, metric.getName(), metric);
            sparkplugDevice.putMetric(metric.getName(), new HostMetric(metric, false));
            this.eventHandler.onBirthMetric(deviceDescriptor, metric);
        }
        this.eventHandler.onDeviceBirthComplete(deviceDescriptor);
    }

    protected void handleNodeData(MessageContext messageContext) throws Exception {
        logger.debug("Processing NDATA from Edge Node {} with Seq# {}", messageContext.getTopic().getEdgeNodeDescriptor(), Long.valueOf(messageContext.getSeqNum()));
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(messageContext.getTopic().getEdgeNodeDescriptor());
        if (sparkplugEdgeNode == null || !sparkplugEdgeNode.isOnline()) {
            requestRebirth(messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId(), messageContext.getTopic().getEdgeNodeDescriptor());
            throw new TahuException(TahuErrorCode.INVALID_ARGUMENT, "Invalid state of the Sparkplug Edge Node when receiving a NDATA - " + messageContext.getTopic().getSparkplugDescriptor() + " is offline");
        }
        sparkplugEdgeNode.handleSeq(messageContext.getPayload().getSeq());
        this.eventHandler.onNodeDataArrived(edgeNodeDescriptor, messageContext.getMessage());
        this.eventHandler.onMessage(edgeNodeDescriptor, messageContext.getMessage());
        for (Metric metric : messageContext.getPayload().getMetrics()) {
            if (!metric.hasName() && metric.hasAlias()) {
                metric.setName(HostApplicationMetricMap.getInstance().getMetricName(edgeNodeDescriptor, edgeNodeDescriptor, metric.getAlias().longValue()));
            }
            sparkplugEdgeNode.updateValue(metric.getName(), metric.getValue());
            this.eventHandler.onDataMetric(edgeNodeDescriptor, metric);
        }
        this.eventHandler.onNodeDataArrived(edgeNodeDescriptor, messageContext.getMessage());
    }

    protected void handleDeviceData(MessageContext messageContext) throws Exception {
        logger.debug("Processing DDATA from Device {} with Seq# {}", messageContext.getTopic().getSparkplugDescriptor(), Long.valueOf(messageContext.getSeqNum()));
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        DeviceDescriptor deviceDescriptor = (DeviceDescriptor) messageContext.getTopic().getSparkplugDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(edgeNodeDescriptor);
        SparkplugDevice sparkplugDevice = EdgeNodeManager.getInstance().getSparkplugDevice(edgeNodeDescriptor, deviceDescriptor);
        if (sparkplugDevice == null || !sparkplugEdgeNode.isOnline()) {
            requestRebirth(messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId(), messageContext.getTopic().getEdgeNodeDescriptor());
            throw new TahuException(TahuErrorCode.INVALID_ARGUMENT, "Invalid state of the Sparkplug Device when receiving a DDATA - " + messageContext.getTopic().getSparkplugDescriptor() + " is offline");
        }
        sparkplugEdgeNode.handleSeq(messageContext.getPayload().getSeq());
        this.eventHandler.onDeviceDataArrived(deviceDescriptor, messageContext.getMessage());
        this.eventHandler.onMessage(deviceDescriptor, messageContext.getMessage());
        for (Metric metric : messageContext.getPayload().getMetrics()) {
            if (!metric.hasName() && metric.hasAlias()) {
                metric.setName(HostApplicationMetricMap.getInstance().getMetricName(edgeNodeDescriptor, deviceDescriptor, metric.getAlias().longValue()));
            }
            sparkplugDevice.updateValue(metric.getName(), metric.getValue());
            this.eventHandler.onDataMetric(deviceDescriptor, metric);
        }
        this.eventHandler.onDeviceDataComplete(deviceDescriptor);
    }

    protected void handleNodeDeath(MessageContext messageContext) {
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        try {
            SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(edgeNodeDescriptor);
            Long bdSequenceNumber = SparkplugUtil.getBdSequenceNumber(messageContext.getPayload());
            if (sparkplugEdgeNode == null || bdSequenceNumber == null) {
                logger.error("Unable to find Edge Node or current bdSeq number for NDEATH from {} - ignoring NDEATH", messageContext.getTopic().getEdgeNodeDescriptor());
            } else if (sparkplugEdgeNode.isOnline()) {
                long longValue = sparkplugEdgeNode.getBirthBdSeqNum().longValue();
                if (longValue == bdSequenceNumber.longValue()) {
                    this.eventHandler.onNodeDeath(edgeNodeDescriptor, messageContext.getMessage());
                    this.eventHandler.onMessage(edgeNodeDescriptor, messageContext.getMessage());
                    staleTags(edgeNodeDescriptor, sparkplugEdgeNode);
                    sparkplugEdgeNode.setOnline(false, messageContext.getPayload().getTimestamp(), bdSequenceNumber, null);
                    for (SparkplugDevice sparkplugDevice : sparkplugEdgeNode.getSparkplugDevices().values()) {
                        staleTags(sparkplugDevice.getDeviceDescrptor(), sparkplugDevice);
                        sparkplugDevice.setOnline(false, messageContext.getPayload().getTimestamp());
                    }
                    this.eventHandler.onNodeDeathComplete(edgeNodeDescriptor);
                } else {
                    logger.error("Edge Node bdSeq number mismatch on incoming NDEATH from {} - received {}, expected {} - ignoring NDEATH", new Object[]{edgeNodeDescriptor, bdSequenceNumber, Long.valueOf(longValue)});
                }
            } else {
                logger.error("Edge Node '{}' is not online - ignoring NDEATH", edgeNodeDescriptor);
            }
        } catch (Exception e) {
            logger.error("Sparkplug BD sequence number from {} is missing - ignoring NDEATH", edgeNodeDescriptor);
        }
    }

    protected void handleDeviceDeath(MessageContext messageContext) throws TahuException {
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        DeviceDescriptor deviceDescriptor = (DeviceDescriptor) messageContext.getTopic().getSparkplugDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(edgeNodeDescriptor);
        SparkplugDevice sparkplugDevice = EdgeNodeManager.getInstance().getSparkplugDevice(edgeNodeDescriptor, deviceDescriptor);
        if (sparkplugDevice == null || !sparkplugEdgeNode.isOnline() || !sparkplugDevice.isOnline()) {
            logger.error("Invalid state of the Sparkplug Device when receiving a DDEATH - " + messageContext.getTopic().getSparkplugDescriptor() + " is offline - ignoring DDEATH");
            return;
        }
        sparkplugEdgeNode.handleSeq(messageContext.getPayload().getSeq());
        if (sparkplugEdgeNode.isOnline() && sparkplugDevice.isOnline()) {
            this.eventHandler.onDeviceDeath(deviceDescriptor, messageContext.getMessage());
            this.eventHandler.onMessage(deviceDescriptor, messageContext.getMessage());
            staleTags(deviceDescriptor, sparkplugDevice);
            sparkplugDevice.setOnline(false, messageContext.getPayload().getTimestamp());
            this.eventHandler.onDeviceDeathComplete(deviceDescriptor);
            return;
        }
        Logger logger2 = logger;
        Object[] objArr = new Object[3];
        objArr[0] = deviceDescriptor;
        objArr[1] = sparkplugEdgeNode.isOnline() ? "online" : "offline";
        objArr[2] = sparkplugDevice.isOnline() ? "online" : "offline";
        logger2.error("Online requirements not met for {} - edgeNode={} and device={} - ignoring DDEATH", objArr);
    }

    private void staleTags(SparkplugDescriptor sparkplugDescriptor, MetricManager metricManager) {
        for (String str : metricManager.getMetricNames()) {
            metricManager.setStale(str, true);
            this.eventHandler.onStale(sparkplugDescriptor, metricManager.getMetric(str));
        }
    }

    public void requestRebirth(MqttServerName mqttServerName, MqttClientId mqttClientId, EdgeNodeDescriptor edgeNodeDescriptor) {
        requestRebirth(mqttServerName, mqttClientId, edgeNodeDescriptor, null);
    }

    public void requestRebirth(MqttServerName mqttServerName, MqttClientId mqttClientId, EdgeNodeDescriptor edgeNodeDescriptor, SparkplugEdgeNode sparkplugEdgeNode) {
        try {
            if (rebirthTimers.get(edgeNodeDescriptor) == null) {
                logger.info("Requesting Rebirth from {}", edgeNodeDescriptor);
                Timer timer = new Timer();
                rebirthTimers.put(edgeNodeDescriptor, timer);
                timer.schedule(new RebirthDelayTask(edgeNodeDescriptor), InMemorySagaService.DEFAULT_RETRY_DELAY_IN_MILLISECONDS);
                SparkplugBPayload createPayload = new SparkplugBPayload.SparkplugBPayloadBuilder().setTimestamp(new Date()).addMetric(new Metric.MetricBuilder(SparkplugMeta.METRIC_NODE_REBIRTH, MetricDataType.Boolean, (Object) true).createMetric()).createPayload();
                Topic topic = new Topic(SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX, edgeNodeDescriptor, MessageType.NCMD);
                if (sparkplugEdgeNode != null) {
                    sparkplugEdgeNode.forceOffline(new Date());
                    if (mqttServerName == null || sparkplugEdgeNode.getMqttServerName() == null || !mqttServerName.equals(sparkplugEdgeNode.getMqttServerName())) {
                        logger.info("On Rebirth request - MQTT Server has changed: new={}, old={}", mqttServerName, sparkplugEdgeNode.getMqttServerName());
                    } else {
                        logger.debug("On Rebirth request - Current Engine MQTT Server is unchanged: {}", mqttServerName);
                    }
                    if (mqttClientId == null || sparkplugEdgeNode.getHostAppMqttClientId() == null || !mqttClientId.equals(sparkplugEdgeNode.getHostAppMqttClientId())) {
                        logger.info("On Rebirth request - MQTT Client ID has changed: new={}, old={}", mqttClientId, sparkplugEdgeNode.getHostAppMqttClientId());
                    } else {
                        logger.debug("On Rebirth request - Current Engine MQTT Client ID is unchanged: {}", mqttClientId);
                    }
                    sparkplugEdgeNode.setMqttServerName(mqttServerName);
                    sparkplugEdgeNode.setHostAppMqttClientId(mqttClientId);
                    publishCommand(mqttServerName, topic, createPayload);
                } else {
                    logger.debug("Current Engine MQTT Server Name for unknown Edge Node: {}", mqttServerName);
                    logger.debug("Current Engine MQTT Client ID for unknown Edge Node: {}", mqttClientId);
                    publishCommand(mqttServerName, topic, createPayload);
                }
            } else {
                logger.debug("Not requesting Rebirth since we have in the last 5 seconds");
            }
        } catch (Exception e) {
            logger.error("Failed to create Rebirth request", e);
        }
    }

    private void publishCommand(MqttServerName mqttServerName, Topic topic, SparkplugBPayload sparkplugBPayload) throws Exception {
        this.commandPublisher.publishCommand(mqttServerName, topic, sparkplugBPayload);
    }
}
