package org.eclipse.tahu.host;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.tahu.host.api.HostApplicationEventHandler;
import org.eclipse.tahu.host.seq.SequenceReorderManager;
import org.eclipse.tahu.message.PayloadDecoder;
import org.eclipse.tahu.message.model.SparkplugBPayload;
import org.eclipse.tahu.message.model.SparkplugMeta;
import org.eclipse.tahu.message.model.StatePayload;
import org.eclipse.tahu.mqtt.ClientCallback;
import org.eclipse.tahu.mqtt.MqttClientId;
import org.eclipse.tahu.mqtt.MqttServerName;
import org.eclipse.tahu.mqtt.MqttServerUrl;
import org.eclipse.tahu.mqtt.TahuClient;
import org.eclipse.tahu.util.TopicUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/tahu/host/TahuHostCallback.class */
public class TahuHostCallback implements ClientCallback {
    private static Logger logger = LoggerFactory.getLogger(TahuHostCallback.class.getName());
    private static final int DEFAULT_NUM_OF_THREADS = 100;
    private final ThreadPoolExecutor[] sparkplugBExecutors;
    private Map<MqttServerName, TahuClient> tahuClients;
    private final boolean enableSequenceReordering;
    private final HostApplicationEventHandler eventHandler;
    private final CommandPublisher commandPublisher;
    private final SequenceReorderManager sequenceReorderManager;
    private final PayloadDecoder<SparkplugBPayload> payloadDecoder;
    private final String hostId;
    private boolean onlineState;

    public TahuHostCallback(HostApplicationEventHandler hostApplicationEventHandler, CommandPublisher commandPublisher, SequenceReorderManager sequenceReorderManager, PayloadDecoder<SparkplugBPayload> payloadDecoder, String str, boolean z) {
        this.eventHandler = hostApplicationEventHandler;
        this.commandPublisher = commandPublisher;
        if (sequenceReorderManager != null) {
            this.enableSequenceReordering = true;
            this.sequenceReorderManager = sequenceReorderManager;
            this.sequenceReorderManager.start();
        } else {
            this.enableSequenceReordering = false;
            this.sequenceReorderManager = null;
        }
        this.payloadDecoder = payloadDecoder;
        this.hostId = str;
        this.onlineState = z;
        this.sparkplugBExecutors = new ThreadPoolExecutor[100];
        for (int i = 0; i < 100; i++) {
            final String substring = UUID.randomUUID().toString().substring(0, 8);
            this.sparkplugBExecutors[i] = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactory() { // from class: org.eclipse.tahu.host.TahuHostCallback.1
                @Override // java.util.concurrent.ThreadFactory
                public Thread newThread(Runnable runnable) {
                    return new Thread(runnable, String.format("%s-%s", "TahuHostCallback-", substring));
                }
            });
        }
    }

    @Override // org.eclipse.tahu.mqtt.ClientCallback
    public void shutdown() {
        logger.info("Shutting down TahuHostCallback");
        for (int i = 0; i < 100; i++) {
            try {
                this.sparkplugBExecutors[i].shutdownNow();
            } catch (Exception e) {
                logger.error("Failed to shutdown executor", e);
            }
        }
    }

    public void setMqttClients(Map<MqttServerName, TahuClient> map) {
        this.tahuClients = map;
    }

    public void setOnlineState(boolean z) {
        this.onlineState = z;
    }

    @Override // org.eclipse.tahu.mqtt.ClientCallback
    public void messageArrived(MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId, String str, MqttMessage mqttMessage) {
        try {
            TahuClient tahuClient = this.tahuClients.get(mqttServerName);
            if (tahuClient == null) {
                logger.error("Message arrived on topic {} from unknown client {} on {}", new Object[]{str, mqttClientId, mqttServerName});
                Iterator<Map.Entry<MqttServerName, TahuClient>> it = this.tahuClients.entrySet().iterator();
                while (it.hasNext()) {
                    logger.error("Failed - but found: {}", it.next().getKey());
                }
                return;
            }
            logger.trace("Message arrived on topic {} from client {}", str, mqttClientId);
            if (str == null) {
                logger.error("Invalid null topic");
                return;
            }
            String[] splitTopic = TopicUtil.getSplitTopic(str);
            long nanoTime = System.nanoTime();
            if (!str.startsWith(SparkplugMeta.SPARKPLUG_B_TOPIC_PREFIX)) {
                logger.debug("Received non-Sparkplug message on topic {}", str);
            } else if (splitTopic.length == 3 && splitTopic[1].equals(SparkplugMeta.SPARKPLUG_TOPIC_HOST_STATE_TOKEN)) {
                StatePayload statePayload = (StatePayload) new ObjectMapper().readValue(new String(mqttMessage.getPayload()), StatePayload.class);
                if (this.hostId != null && !this.hostId.trim().isEmpty() && splitTopic[2].equals(this.hostId)) {
                    if (!statePayload.isOnline().booleanValue() && this.onlineState) {
                        logger.info("This is a offline STATE message from {} - correcting with new online STATE message", splitTopic[2]);
                        tahuClient.publishBirthMessage();
                    } else if (statePayload.isOnline().booleanValue() && !this.onlineState) {
                        logger.info("This is a online STATE message from {} - correcting with new offline STATE message", splitTopic[2]);
                        tahuClient.publishLwt(true);
                    }
                }
            } else {
                int threadPoolExecutorIndex = getThreadPoolExecutorIndex(splitTopic[1] + "/" + splitTopic[3], 100);
                logger.debug("Adding Sparkplug B message to ThreadPoolExecutor {} :: {}", Integer.valueOf(threadPoolExecutorIndex), Integer.valueOf(this.sparkplugBExecutors[threadPoolExecutorIndex].getQueue().size()));
                ThreadPoolExecutor threadPoolExecutor = this.sparkplugBExecutors[threadPoolExecutorIndex];
                if (this.enableSequenceReordering) {
                    logger.trace("Sending the message on {} to the SequenceReorderManager", str);
                    this.sequenceReorderManager.handlePayload(this, threadPoolExecutor, str, splitTopic, mqttMessage, mqttServerName, mqttClientId, nanoTime);
                } else {
                    threadPoolExecutor.execute(() -> {
                        try {
                            try {
                                logger.trace("Sending the message on {} directly to the TahuPayloadHandler", str);
                                new TahuPayloadHandler(this.eventHandler, this.commandPublisher, this.payloadDecoder).handlePayload(str, splitTopic, mqttMessage, mqttServerName, mqttClientId);
                                long nanoTime2 = System.nanoTime() - nanoTime;
                                if (logger.isTraceEnabled()) {
                                    logger.trace("Updating message processing latency {}", Long.valueOf(nanoTime2));
                                }
                            } catch (Throwable th) {
                                logger.error("Failed to handle Sparkplug B message on topic {}", str, th);
                                long nanoTime3 = System.nanoTime() - nanoTime;
                                if (logger.isTraceEnabled()) {
                                    logger.trace("Updating message processing latency {}", Long.valueOf(nanoTime3));
                                }
                            }
                        } catch (Throwable th2) {
                            long nanoTime4 = System.nanoTime() - nanoTime;
                            if (logger.isTraceEnabled()) {
                                logger.trace("Updating message processing latency {}", Long.valueOf(nanoTime4));
                            }
                            throw th2;
                        }
                    });
                }
            }
        } catch (Throwable th) {
            logger.error("Failed to handle message on topic {}", str, th);
        }
    }

    private int getThreadPoolExecutorIndex(String str, int i) {
        return Math.abs(str.hashCode() % i);
    }

    @Override // org.eclipse.tahu.mqtt.ClientCallback
    public void connectionLost(MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId, Throwable th) {
        logger.warn("Connection Lost to - {} :: {} :: {}", new Object[]{mqttServerName, mqttServerUrl, mqttClientId});
        this.eventHandler.onDisconnect();
        if (th != null) {
            logger.error("Connection lost due to - {}", th.getMessage(), th);
        }
        logger.info("Clear out all connection counts to this MQTT Server");
        this.tahuClients.get(mqttServerName).clearConnectionCount();
        TahuClient tahuClient = this.tahuClients.get(mqttServerName);
        String lwtTopic = this.tahuClients.get(mqttServerName).getLwtTopic();
        if (lwtTopic != null && lwtTopic.startsWith(SparkplugMeta.SPARKPLUG_TOPIC_HOST_STATE_PREFIX)) {
            logger.debug("Setting Primary Host ID info tag for {} to offline", lwtTopic.substring(SparkplugMeta.SPARKPLUG_TOPIC_HOST_STATE_PREFIX.length() + 1, lwtTopic.length()));
        }
        if (tahuClient.getAutoReconnect()) {
            tahuClient.connect();
        }
    }

    @Override // org.eclipse.tahu.mqtt.ClientCallback
    public void connectComplete(boolean z, MqttServerName mqttServerName, MqttServerUrl mqttServerUrl, MqttClientId mqttClientId) {
        this.eventHandler.onConnect();
    }

    private void updateEngineInfoDateTag(MqttServerName mqttServerName, String str) {
    }
}
