/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.tahu.host;

import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.tahu.exception.TahuErrorCode;
import org.eclipse.tahu.exception.TahuException;
import org.eclipse.tahu.host.CommandPublisher;
import org.eclipse.tahu.host.api.HostApplicationEventHandler;
import org.eclipse.tahu.host.manager.EdgeNodeManager;
import org.eclipse.tahu.host.manager.MetricManager;
import org.eclipse.tahu.host.manager.SparkplugDevice;
import org.eclipse.tahu.host.manager.SparkplugEdgeNode;
import org.eclipse.tahu.host.model.HostApplicationMetricMap;
import org.eclipse.tahu.host.model.HostMetric;
import org.eclipse.tahu.host.model.MessageContext;
import org.eclipse.tahu.message.PayloadDecoder;
import org.eclipse.tahu.message.model.DeviceDescriptor;
import org.eclipse.tahu.message.model.EdgeNodeDescriptor;
import org.eclipse.tahu.message.model.MessageType;
import org.eclipse.tahu.message.model.Metric;
import org.eclipse.tahu.message.model.MetricDataType;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.SparkplugDescriptor;
import org.eclipse.tahu.message.model.Topic;
import org.eclipse.tahu.mqtt.MqttClientId;
import org.eclipse.tahu.mqtt.MqttServerName;
import org.eclipse.tahu.util.SparkplugUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TahuPayloadHandler {
    private static Logger logger = LoggerFactory.getLogger((String)TahuPayloadHandler.class.getName());
    private static Map<EdgeNodeDescriptor, Timer> rebirthTimers = new ConcurrentHashMap<EdgeNodeDescriptor, Timer>();
    private final HostApplicationEventHandler eventHandler;
    private final CommandPublisher commandPublisher;
    private final PayloadDecoder<SparkplugBPayload> payloadDecoder;

    public TahuPayloadHandler(HostApplicationEventHandler eventHandler, CommandPublisher commandPublisher, PayloadDecoder<SparkplugBPayload> payloadDecoder) {
        this.eventHandler = eventHandler;
        this.commandPublisher = commandPublisher;
        this.payloadDecoder = payloadDecoder;
    }

    public void handlePayload(String topicString, String[] splitTopic, MqttMessage message, MqttServerName mqttServerName, MqttClientId hostAppMqttClientId) {
        Topic topic;
        block20: {
            logger.trace("Handling payload on {}", (Object)topicString);
            topic = null;
            try {
                if (splitTopic.length == 4) {
                    topic = new Topic(splitTopic[0], splitTopic[1], splitTopic[3], MessageType.valueOf(splitTopic[2]));
                    break block20;
                }
                if (splitTopic.length == 5) {
                    topic = new Topic(splitTopic[0], splitTopic[1], splitTopic[3], splitTopic[4], MessageType.valueOf(splitTopic[2]));
                    break block20;
                }
                logger.error("Failed to handle the topic '{}'", (Object)topicString);
                return;
            }
            catch (Exception e) {
                logger.error("Error parsing topic", (Throwable)e);
                return;
            }
        }
        MessageType type = topic.getType();
        SparkplugBPayload payload = null;
        try {
            payload = this.payloadDecoder.buildFromByteArray(message.getPayload(), HostApplicationMetricMap.getInstance().getMetricDataTypeMap(topic.getEdgeNodeDescriptor(), topic.getSparkplugDescriptor()));
            logger.trace("On topic={}: Incoming payload: {}", (Object)topic, (Object)payload);
        }
        catch (Exception e) {
            logger.error("Failed to decode the payload", (Throwable)e);
            return;
        }
        if (type.isCommand()) {
            logger.debug("Ignoring outbound command: {}", (Object)topicString);
            return;
        }
        Long seqNum = null;
        if (!type.equals((Object)MessageType.NDEATH)) {
            if (payload == null || payload.getSeq() == null) {
                logger.error("Invalid payload with topic={}: {}", (Object)topicString, (Object)(payload == null ? "payload is null" : "sequence number is null"));
                return;
            }
            seqNum = payload.getSeq();
            if (seqNum == null) {
                logger.error("Invalid payload missing sequence number: {}", (Object)topicString);
                return;
            }
        }
        try {
            MessageContext messageContext = new MessageContext(mqttServerName, hostAppMqttClientId, topic, payload, message.getPayload() == null ? 0 : message.getPayload().length, seqNum == null ? -1L : seqNum);
            switch (type) {
                case NBIRTH: {
                    logger.info("Handling NBIRTH from {}", (Object)topic.getSparkplugDescriptor());
                    this.handleNodeBirth(messageContext);
                    break;
                }
                case DBIRTH: {
                    logger.info("Handling DBIRTH from {}", (Object)topic.getSparkplugDescriptor());
                    this.handleDeviceBirth(messageContext);
                    break;
                }
                case NDATA: {
                    logger.info("Handling NDATA from {}", (Object)topic.getSparkplugDescriptor());
                    this.handleNodeData(messageContext);
                    break;
                }
                case DDATA: {
                    logger.info("Handling DDATA from {}", (Object)topic.getSparkplugDescriptor());
                    this.handleDeviceData(messageContext);
                    break;
                }
                case NDEATH: {
                    logger.info("Handling NDEATH from {}", (Object)topic.getSparkplugDescriptor());
                    this.handleNodeDeath(messageContext);
                    break;
                }
                case DDEATH: {
                    logger.info("Handling DDEATH from {}", (Object)topic.getSparkplugDescriptor());
                    this.handleDeviceDeath(messageContext);
                    break;
                }
                default: {
                    logger.info("Unknown message with type={} on topic={}", (Object)type, (Object)topic);
                    break;
                }
            }
        }
        catch (Exception e) {
            logger.error("Failed to handle payload on topic: {} with payload={}", new Object[]{topic, payload, e});
            return;
        }
    }

    protected void handleNodeBirth(MessageContext messageContext) throws Exception {
        logger.debug("Processing NBIRTH from Edge Node {} with Seq# {}", (Object)messageContext.getTopic().getEdgeNodeDescriptor(), (Object)messageContext.getSeqNum());
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(messageContext.getTopic().getEdgeNodeDescriptor());
        if (sparkplugEdgeNode == null) {
            sparkplugEdgeNode = EdgeNodeManager.getInstance().addSparkplugEdgeNode(edgeNodeDescriptor, messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId());
        } else {
            sparkplugEdgeNode.clearMetrics();
        }
        HostApplicationMetricMap hostApplicationMetricMap = HostApplicationMetricMap.getInstance();
        hostApplicationMetricMap.clear(sparkplugEdgeNode.getEdgeNodeDescriptor());
        sparkplugEdgeNode.setOnline(true, messageContext.getPayload().getTimestamp(), SparkplugUtil.getBdSequenceNumber(messageContext.getPayload()), messageContext.getSeqNum());
        this.eventHandler.onNodeBirthArrived(edgeNodeDescriptor, messageContext.getMessage());
        this.eventHandler.onMessage(edgeNodeDescriptor, messageContext.getMessage());
        for (Metric metric : messageContext.getPayload().getMetrics()) {
            if (metric.hasAlias() && hostApplicationMetricMap.aliasExists(edgeNodeDescriptor, messageContext.getTopic().getSparkplugDescriptor(), metric.getAlias())) {
                String errorMessage = "Not adding duplicated alias for edgeNode=" + edgeNodeDescriptor + " - alias=" + metric.getAlias() + " and metric name=" + metric.getName() + " - with existing alias for " + hostApplicationMetricMap.getMetricName(edgeNodeDescriptor, messageContext.getTopic().getSparkplugDescriptor(), metric.getAlias());
                logger.error(errorMessage);
                this.requestRebirth(messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId(), messageContext.getTopic().getEdgeNodeDescriptor());
                throw new TahuException(TahuErrorCode.INVALID_ARGUMENT, errorMessage);
            }
            hostApplicationMetricMap.addMetric(edgeNodeDescriptor, edgeNodeDescriptor, metric.getName(), metric);
            sparkplugEdgeNode.putMetric(metric.getName(), new HostMetric(metric, false));
            this.eventHandler.onBirthMetric(edgeNodeDescriptor, metric);
        }
        this.eventHandler.onNodeBirthComplete(edgeNodeDescriptor);
    }

    protected void handleDeviceBirth(MessageContext messageContext) throws Exception {
        logger.debug("Processing DBIRTH from Device {} with Seq# {}", (Object)messageContext.getTopic().getSparkplugDescriptor(), (Object)messageContext.getSeqNum());
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        DeviceDescriptor deviceDescriptor = (DeviceDescriptor)messageContext.getTopic().getSparkplugDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(edgeNodeDescriptor);
        SparkplugDevice sparkplugDevice = EdgeNodeManager.getInstance().getSparkplugDevice(edgeNodeDescriptor, deviceDescriptor);
        if (sparkplugDevice == null) {
            sparkplugDevice = EdgeNodeManager.getInstance().addSparkplugDevice(edgeNodeDescriptor, deviceDescriptor, messageContext.getPayload().getTimestamp());
        } else {
            sparkplugDevice.clearMetrics();
        }
        sparkplugEdgeNode.handleSeq(messageContext.getPayload().getSeq());
        sparkplugDevice.setOnline(true, messageContext.getPayload().getTimestamp());
        this.eventHandler.onDeviceBirthArrived(deviceDescriptor, messageContext.getMessage());
        this.eventHandler.onMessage(deviceDescriptor, messageContext.getMessage());
        HostApplicationMetricMap hostApplicationMetricMap = HostApplicationMetricMap.getInstance();
        for (Metric metric : messageContext.getPayload().getMetrics()) {
            if (metric.hasAlias() && hostApplicationMetricMap.aliasExists(edgeNodeDescriptor, deviceDescriptor, metric.getAlias())) {
                String errorMessage = "Not adding duplicated alias for device=" + deviceDescriptor + " - alias=" + metric.getAlias() + " and metric name=" + metric.getName() + " - with existing alias for " + hostApplicationMetricMap.getMetricName(edgeNodeDescriptor, deviceDescriptor, metric.getAlias());
                logger.error(errorMessage);
                this.requestRebirth(messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId(), messageContext.getTopic().getEdgeNodeDescriptor());
                throw new TahuException(TahuErrorCode.INVALID_ARGUMENT, errorMessage);
            }
            hostApplicationMetricMap.addMetric(edgeNodeDescriptor, deviceDescriptor, metric.getName(), metric);
            sparkplugDevice.putMetric(metric.getName(), new HostMetric(metric, false));
            this.eventHandler.onBirthMetric(deviceDescriptor, metric);
        }
        this.eventHandler.onDeviceBirthComplete(deviceDescriptor);
    }

    protected void handleNodeData(MessageContext messageContext) throws Exception {
        logger.debug("Processing NDATA from Edge Node {} with Seq# {}", (Object)messageContext.getTopic().getEdgeNodeDescriptor(), (Object)messageContext.getSeqNum());
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(messageContext.getTopic().getEdgeNodeDescriptor());
        if (sparkplugEdgeNode == null || !sparkplugEdgeNode.isOnline()) {
            this.requestRebirth(messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId(), messageContext.getTopic().getEdgeNodeDescriptor());
            throw new TahuException(TahuErrorCode.INVALID_ARGUMENT, "Invalid state of the Sparkplug Edge Node when receiving a NDATA - " + messageContext.getTopic().getSparkplugDescriptor() + " is offline");
        }
        sparkplugEdgeNode.handleSeq(messageContext.getPayload().getSeq());
        this.eventHandler.onNodeDataArrived(edgeNodeDescriptor, messageContext.getMessage());
        this.eventHandler.onMessage(edgeNodeDescriptor, messageContext.getMessage());
        for (Metric metric : messageContext.getPayload().getMetrics()) {
            if (!metric.hasName() && metric.hasAlias()) {
                metric.setName(HostApplicationMetricMap.getInstance().getMetricName(edgeNodeDescriptor, edgeNodeDescriptor, metric.getAlias()));
            }
            sparkplugEdgeNode.updateValue(metric.getName(), metric.getValue());
            this.eventHandler.onDataMetric(edgeNodeDescriptor, metric);
        }
        this.eventHandler.onNodeDataArrived(edgeNodeDescriptor, messageContext.getMessage());
    }

    protected void handleDeviceData(MessageContext messageContext) throws Exception {
        logger.debug("Processing DDATA from Device {} with Seq# {}", (Object)messageContext.getTopic().getSparkplugDescriptor(), (Object)messageContext.getSeqNum());
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        DeviceDescriptor deviceDescriptor = (DeviceDescriptor)messageContext.getTopic().getSparkplugDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(edgeNodeDescriptor);
        SparkplugDevice sparkplugDevice = EdgeNodeManager.getInstance().getSparkplugDevice(edgeNodeDescriptor, deviceDescriptor);
        if (sparkplugDevice == null || !sparkplugEdgeNode.isOnline()) {
            this.requestRebirth(messageContext.getMqttServerName(), messageContext.getHostAppMqttClientId(), messageContext.getTopic().getEdgeNodeDescriptor());
            throw new TahuException(TahuErrorCode.INVALID_ARGUMENT, "Invalid state of the Sparkplug Device when receiving a DDATA - " + messageContext.getTopic().getSparkplugDescriptor() + " is offline");
        }
        sparkplugEdgeNode.handleSeq(messageContext.getPayload().getSeq());
        this.eventHandler.onDeviceDataArrived(deviceDescriptor, messageContext.getMessage());
        this.eventHandler.onMessage(deviceDescriptor, messageContext.getMessage());
        for (Metric metric : messageContext.getPayload().getMetrics()) {
            if (!metric.hasName() && metric.hasAlias()) {
                metric.setName(HostApplicationMetricMap.getInstance().getMetricName(edgeNodeDescriptor, deviceDescriptor, metric.getAlias()));
            }
            sparkplugDevice.updateValue(metric.getName(), metric.getValue());
            this.eventHandler.onDataMetric(deviceDescriptor, metric);
        }
        this.eventHandler.onDeviceDataComplete(deviceDescriptor);
    }

    protected void handleNodeDeath(MessageContext messageContext) {
        Long incomingBdSeqNum = -1L;
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        try {
            SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(edgeNodeDescriptor);
            incomingBdSeqNum = SparkplugUtil.getBdSequenceNumber(messageContext.getPayload());
            if (sparkplugEdgeNode != null && incomingBdSeqNum != null) {
                if (sparkplugEdgeNode.isOnline()) {
                    long birthBdSeqNum = sparkplugEdgeNode.getBirthBdSeqNum();
                    if (birthBdSeqNum == incomingBdSeqNum) {
                        this.eventHandler.onNodeDeath(edgeNodeDescriptor, messageContext.getMessage());
                        this.eventHandler.onMessage(edgeNodeDescriptor, messageContext.getMessage());
                        this.staleTags(edgeNodeDescriptor, sparkplugEdgeNode);
                        sparkplugEdgeNode.setOnline(false, messageContext.getPayload().getTimestamp(), incomingBdSeqNum, null);
                        for (SparkplugDevice sparkplugDevice : sparkplugEdgeNode.getSparkplugDevices().values()) {
                            this.staleTags(sparkplugDevice.getDeviceDescrptor(), sparkplugDevice);
                            sparkplugDevice.setOnline(false, messageContext.getPayload().getTimestamp());
                        }
                        this.eventHandler.onNodeDeathComplete(edgeNodeDescriptor);
                    } else {
                        logger.error("Edge Node bdSeq number mismatch on incoming NDEATH from {} - received {}, expected {} - ignoring NDEATH", new Object[]{edgeNodeDescriptor, incomingBdSeqNum, birthBdSeqNum});
                    }
                } else {
                    logger.error("Edge Node '{}' is not online - ignoring NDEATH", (Object)edgeNodeDescriptor);
                }
            } else {
                logger.error("Unable to find Edge Node or current bdSeq number for NDEATH from {} - ignoring NDEATH", (Object)messageContext.getTopic().getEdgeNodeDescriptor());
            }
        }
        catch (Exception e) {
            logger.error("Sparkplug BD sequence number from {} is missing - ignoring NDEATH", (Object)edgeNodeDescriptor);
        }
    }

    protected void handleDeviceDeath(MessageContext messageContext) throws TahuException {
        EdgeNodeDescriptor edgeNodeDescriptor = messageContext.getTopic().getEdgeNodeDescriptor();
        DeviceDescriptor deviceDescriptor = (DeviceDescriptor)messageContext.getTopic().getSparkplugDescriptor();
        SparkplugEdgeNode sparkplugEdgeNode = EdgeNodeManager.getInstance().getSparkplugEdgeNode(edgeNodeDescriptor);
        SparkplugDevice sparkplugDevice = EdgeNodeManager.getInstance().getSparkplugDevice(edgeNodeDescriptor, deviceDescriptor);
        if (sparkplugDevice == null || !sparkplugEdgeNode.isOnline() || !sparkplugDevice.isOnline()) {
            logger.error("Invalid state of the Sparkplug Device when receiving a DDEATH - " + messageContext.getTopic().getSparkplugDescriptor() + " is offline - ignoring DDEATH");
            return;
        }
        sparkplugEdgeNode.handleSeq(messageContext.getPayload().getSeq());
        if (sparkplugEdgeNode.isOnline() && sparkplugDevice.isOnline()) {
            this.eventHandler.onDeviceDeath(deviceDescriptor, messageContext.getMessage());
            this.eventHandler.onMessage(deviceDescriptor, messageContext.getMessage());
            this.staleTags(deviceDescriptor, sparkplugDevice);
            sparkplugDevice.setOnline(false, messageContext.getPayload().getTimestamp());
            this.eventHandler.onDeviceDeathComplete(deviceDescriptor);
        } else {
            logger.error("Online requirements not met for {} - edgeNode={} and device={} - ignoring DDEATH", new Object[]{deviceDescriptor, sparkplugEdgeNode.isOnline() ? "online" : "offline", sparkplugDevice.isOnline() ? "online" : "offline"});
        }
    }

    private void staleTags(SparkplugDescriptor sparkplugDescriptor, MetricManager metricManager) {
        Set<String> metricNames = metricManager.getMetricNames();
        for (String metricName : metricNames) {
            metricManager.setStale(metricName, true);
            this.eventHandler.onStale(sparkplugDescriptor, metricManager.getMetric(metricName));
        }
    }

    public void requestRebirth(MqttServerName mqttServerName, MqttClientId hostAppMqttClientId, EdgeNodeDescriptor edgeNodeDescriptor) {
        this.requestRebirth(mqttServerName, hostAppMqttClientId, edgeNodeDescriptor, null);
    }

    public void requestRebirth(MqttServerName mqttServerName, MqttClientId hostAppMqttClientId, EdgeNodeDescriptor edgeNodeDescriptor, SparkplugEdgeNode sparkplugEdgeNode) {
        try {
            Timer rebirthDelayTimer = rebirthTimers.get(edgeNodeDescriptor);
            if (rebirthDelayTimer == null) {
                logger.info("Requesting Rebirth from {}", (Object)edgeNodeDescriptor);
                rebirthDelayTimer = new Timer();
                rebirthTimers.put(edgeNodeDescriptor, rebirthDelayTimer);
                rebirthDelayTimer.schedule((TimerTask)new RebirthDelayTask(edgeNodeDescriptor), 5000L);
                SparkplugBPayload cmdPayload = new SparkplugBPayload.SparkplugBPayloadBuilder().setTimestamp(new Date()).addMetric(new Metric.MetricBuilder("Node Control/Rebirth", MetricDataType.Boolean, (Object)true).createMetric()).createPayload();
                Topic cmdTopic = new Topic("spBv1.0", edgeNodeDescriptor, MessageType.NCMD);
                if (sparkplugEdgeNode != null) {
                    sparkplugEdgeNode.forceOffline(new Date());
                    if (mqttServerName != null && sparkplugEdgeNode.getMqttServerName() != null && mqttServerName.equals(sparkplugEdgeNode.getMqttServerName())) {
                        logger.debug("On Rebirth request - Current Engine MQTT Server is unchanged: {}", (Object)mqttServerName);
                    } else {
                        logger.info("On Rebirth request - MQTT Server has changed: new={}, old={}", (Object)mqttServerName, (Object)sparkplugEdgeNode.getMqttServerName());
                    }
                    if (hostAppMqttClientId != null && sparkplugEdgeNode.getHostAppMqttClientId() != null && hostAppMqttClientId.equals(sparkplugEdgeNode.getHostAppMqttClientId())) {
                        logger.debug("On Rebirth request - Current Engine MQTT Client ID is unchanged: {}", (Object)hostAppMqttClientId);
                    } else {
                        logger.info("On Rebirth request - MQTT Client ID has changed: new={}, old={}", (Object)hostAppMqttClientId, (Object)sparkplugEdgeNode.getHostAppMqttClientId());
                    }
                    sparkplugEdgeNode.setMqttServerName(mqttServerName);
                    sparkplugEdgeNode.setHostAppMqttClientId(hostAppMqttClientId);
                    this.publishCommand(mqttServerName, cmdTopic, cmdPayload);
                } else {
                    logger.debug("Current Engine MQTT Server Name for unknown Edge Node: {}", (Object)mqttServerName);
                    logger.debug("Current Engine MQTT Client ID for unknown Edge Node: {}", (Object)hostAppMqttClientId);
                    this.publishCommand(mqttServerName, cmdTopic, cmdPayload);
                }
            } else {
                logger.debug("Not requesting Rebirth since we have in the last 5 seconds");
            }
        }
        catch (Exception e) {
            logger.error("Failed to create Rebirth request", (Throwable)e);
            return;
        }
    }

    private void publishCommand(MqttServerName mqttServerName, Topic topic, SparkplugBPayload payload) throws Exception {
        this.commandPublisher.publishCommand(mqttServerName, topic, payload);
    }

    private class RebirthDelayTask
    extends TimerTask {
        private EdgeNodeDescriptor edgeNodeDescriptor;

        public RebirthDelayTask(EdgeNodeDescriptor edgeNodeDescriptor) {
            this.edgeNodeDescriptor = edgeNodeDescriptor;
        }

        @Override
        public void run() {
            if (rebirthTimers.get(this.edgeNodeDescriptor) != null) {
                ((Timer)rebirthTimers.get(this.edgeNodeDescriptor)).cancel();
                rebirthTimers.remove(this.edgeNodeDescriptor);
            }
        }
    }
}

