/*
 * Decompiled with CFR 0.152.
 */
package org.apache.plc4x.java.opcua.protocol;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.value.PlcValue;
import org.apache.plc4x.java.opcua.context.Conversation;
import org.apache.plc4x.java.opcua.protocol.OpcuaProtocolLogic;
import org.apache.plc4x.java.opcua.readwrite.CreateMonitoredItemsRequest;
import org.apache.plc4x.java.opcua.readwrite.CreateMonitoredItemsResponse;
import org.apache.plc4x.java.opcua.readwrite.DataChangeNotification;
import org.apache.plc4x.java.opcua.readwrite.DataValue;
import org.apache.plc4x.java.opcua.readwrite.DeleteSubscriptionsRequest;
import org.apache.plc4x.java.opcua.readwrite.DeleteSubscriptionsResponse;
import org.apache.plc4x.java.opcua.readwrite.ExtensionObject;
import org.apache.plc4x.java.opcua.readwrite.ExtensionObjectDefinition;
import org.apache.plc4x.java.opcua.readwrite.MonitoredItemCreateRequest;
import org.apache.plc4x.java.opcua.readwrite.MonitoredItemCreateResult;
import org.apache.plc4x.java.opcua.readwrite.MonitoredItemNotification;
import org.apache.plc4x.java.opcua.readwrite.MonitoringMode;
import org.apache.plc4x.java.opcua.readwrite.MonitoringParameters;
import org.apache.plc4x.java.opcua.readwrite.NodeId;
import org.apache.plc4x.java.opcua.readwrite.NotificationMessage;
import org.apache.plc4x.java.opcua.readwrite.OpcuaStatusCode;
import org.apache.plc4x.java.opcua.readwrite.PublishRequest;
import org.apache.plc4x.java.opcua.readwrite.PublishResponse;
import org.apache.plc4x.java.opcua.readwrite.QualifiedName;
import org.apache.plc4x.java.opcua.readwrite.ReadValueId;
import org.apache.plc4x.java.opcua.readwrite.RequestHeader;
import org.apache.plc4x.java.opcua.readwrite.ResponseHeader;
import org.apache.plc4x.java.opcua.readwrite.SubscriptionAcknowledgement;
import org.apache.plc4x.java.opcua.readwrite.TimestampsToReturn;
import org.apache.plc4x.java.opcua.tag.OpcuaTag;
import org.apache.plc4x.java.spi.messages.DefaultPlcSubscriptionEvent;
import org.apache.plc4x.java.spi.messages.utils.ResponseItem;
import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionHandle;
import org.apache.plc4x.java.spi.model.DefaultPlcSubscriptionTag;
import org.apache.plc4x.java.spi.transaction.RequestTransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OpcuaSubscriptionHandle
extends DefaultPlcSubscriptionHandle {
    private static final Logger LOGGER = LoggerFactory.getLogger(OpcuaSubscriptionHandle.class);
    private static final ScheduledExecutorService EXECUTOR = Executors.newSingleThreadScheduledExecutor(runnable -> new Thread(runnable, "plc4x-opcua-subscription-scheduler"));
    private final Set<Consumer<PlcSubscriptionEvent>> consumers;
    private final List<String> tagNames;
    private final Conversation conversation;
    private final PlcSubscriptionRequest subscriptionRequest;
    private final OpcuaProtocolLogic plcSubscriber;
    private final Long subscriptionId;
    private final long cycleTime;
    private final long revisedCycleTime;
    private final AtomicLong clientHandles = new AtomicLong(1L);
    private final RequestTransactionManager tm;
    private ScheduledFuture<?> publishTask;

    public OpcuaSubscriptionHandle(OpcuaProtocolLogic plcSubscriber, RequestTransactionManager tm, Conversation conversation, PlcSubscriptionRequest subscriptionRequest, Long subscriptionId, long cycleTime) {
        super(plcSubscriber);
        this.tm = tm;
        this.consumers = new HashSet<Consumer<PlcSubscriptionEvent>>();
        this.subscriptionRequest = subscriptionRequest;
        this.tagNames = new ArrayList<String>(subscriptionRequest.getTagNames());
        this.conversation = conversation;
        this.subscriptionId = subscriptionId;
        this.plcSubscriber = plcSubscriber;
        this.cycleTime = cycleTime;
        this.revisedCycleTime = cycleTime;
    }

    public CompletableFuture<OpcuaSubscriptionHandle> onSubscribeCreateMonitoredItemsRequest() {
        ArrayList<ExtensionObjectDefinition> requestList = new ArrayList<ExtensionObjectDefinition>(this.tagNames.size());
        for (String tagName : this.tagNames) {
            MonitoringMode monitoringMode;
            DefaultPlcSubscriptionTag tagDefaultPlcSubscription = (DefaultPlcSubscriptionTag)this.subscriptionRequest.getTag(tagName);
            NodeId idNode = OpcuaProtocolLogic.generateNodeId((OpcuaTag)tagDefaultPlcSubscription.getTag());
            ReadValueId readValueId = new ReadValueId(idNode, 13L, OpcuaProtocolLogic.NULL_STRING, new QualifiedName(0, OpcuaProtocolLogic.NULL_STRING));
            switch (tagDefaultPlcSubscription.getPlcSubscriptionType()) {
                case CYCLIC: {
                    monitoringMode = MonitoringMode.monitoringModeSampling;
                    break;
                }
                case CHANGE_OF_STATE: {
                    monitoringMode = MonitoringMode.monitoringModeReporting;
                    break;
                }
                case EVENT: {
                    monitoringMode = MonitoringMode.monitoringModeReporting;
                    break;
                }
                default: {
                    monitoringMode = MonitoringMode.monitoringModeReporting;
                }
            }
            long clientHandle = this.clientHandles.getAndIncrement();
            MonitoringParameters parameters = new MonitoringParameters(clientHandle, this.cycleTime, OpcuaProtocolLogic.NULL_EXTENSION_OBJECT, 1L, true);
            MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(readValueId, monitoringMode, parameters);
            requestList.add(request);
        }
        RequestHeader requestHeader = this.conversation.createRequestHeader();
        CreateMonitoredItemsRequest createMonitoredItemsRequest = new CreateMonitoredItemsRequest(requestHeader, this.subscriptionId, TimestampsToReturn.timestampsToReturnBoth, requestList.size(), requestList);
        return ((CompletableFuture)this.conversation.submit(createMonitoredItemsRequest, CreateMonitoredItemsResponse.class).whenComplete((response, error) -> {
            if (error instanceof TimeoutException) {
                LOGGER.info("Timeout while sending the Create Monitored Item Subscription Message", error);
            } else if (error != null) {
                LOGGER.info("Error while sending the Create Monitored Item Subscription Message", error);
            }
        })).thenApply(responseMessage -> {
            MonitoredItemCreateResult[] array = responseMessage.getResults().toArray(new MonitoredItemCreateResult[0]);
            int index = 0;
            int arrayLength = array.length;
            while (index < arrayLength) {
                MonitoredItemCreateResult result = array[index];
                if (OpcuaStatusCode.enumForValue(result.getStatusCode().getStatusCode()) != OpcuaStatusCode.Good) {
                    LOGGER.error("Invalid Tag {}, subscription created without this tag", (Object)this.tagNames.get(index));
                } else {
                    LOGGER.debug("Tag {} was added to the subscription", (Object)this.tagNames.get(index));
                }
                ++index;
            }
            LOGGER.trace("Scheduling publish event for subscription {}", (Object)this.subscriptionId);
            this.publishTask = EXECUTOR.scheduleAtFixedRate(this::sendPublishRequest, this.revisedCycleTime / 2L, this.revisedCycleTime, TimeUnit.MILLISECONDS);
            return this;
        });
    }

    private void sendPublishRequest() {
        LinkedList outstandingAcknowledgements = new LinkedList();
        LinkedList outstandingRequests = new LinkedList();
        if (outstandingRequests.size() <= 1) {
            RequestHeader requestHeader = this.conversation.createRequestHeader(this.revisedCycleTime * 10L);
            ArrayList<ExtensionObjectDefinition> acks = new ArrayList<ExtensionObjectDefinition>(outstandingAcknowledgements);
            int ackLength = acks.size();
            outstandingAcknowledgements.removeAll(acks);
            PublishRequest publishRequest = new PublishRequest(requestHeader, ackLength, acks);
            RequestTransactionManager.RequestTransaction transaction = this.tm.startRequest();
            transaction.submit(() -> {
                ((CompletableFuture)this.conversation.submit(publishRequest, PublishResponse.class).thenAccept(responseMessage -> {
                    outstandingRequests.remove(((ResponseHeader)responseMessage.getResponseHeader()).getRequestHandle());
                    for (long availableSequenceNumber : responseMessage.getAvailableSequenceNumbers()) {
                        outstandingAcknowledgements.add(new SubscriptionAcknowledgement(this.subscriptionId, availableSequenceNumber));
                    }
                    for (ExtensionObject notificationMessage : ((NotificationMessage)responseMessage.getNotificationMessage()).getNotificationData()) {
                        ExtensionObjectDefinition notification = notificationMessage.getBody();
                        if (notification instanceof DataChangeNotification) {
                            LOGGER.trace("Found a Data Change notification");
                            List<ExtensionObjectDefinition> items = ((DataChangeNotification)notification).getMonitoredItems();
                            this.onSubscriptionValue(items.toArray(new MonitoredItemNotification[0]));
                            continue;
                        }
                        LOGGER.warn("Unsupported Notification type");
                    }
                })).whenComplete((result, error) -> {
                    if (error != null) {
                        LOGGER.warn("Publish request of subscription {} resulted in error reported by server", (Object)this.subscriptionId, error);
                        transaction.failRequest((Throwable)error);
                    } else {
                        LOGGER.trace("Completed publish request for subscription {}", (Object)this.subscriptionId);
                        transaction.endRequest();
                    }
                });
                outstandingRequests.add(requestHeader.getRequestHandle());
            });
        }
    }

    public void stopSubscriber() {
        RequestHeader requestHeader = this.conversation.createRequestHeader(this.revisedCycleTime * 10L);
        List<Long> subscriptions = Collections.singletonList(this.subscriptionId);
        DeleteSubscriptionsRequest deleteSubscriptionRequest = new DeleteSubscriptionsRequest(requestHeader, 1, subscriptions);
        RequestTransactionManager.RequestTransaction transaction = this.tm.startRequest();
        transaction.submit(() -> ((CompletableFuture)this.conversation.submit(deleteSubscriptionRequest, DeleteSubscriptionsResponse.class).thenAccept(responseMessage -> {
            boolean bl = this.publishTask.cancel(true);
        })).whenComplete((result, error) -> {
            if (error != null) {
                LOGGER.error("Deletion of subscription resulted in error", error);
                transaction.failRequest((Throwable)error);
            } else {
                transaction.endRequest();
            }
            this.plcSubscriber.removeSubscription(this.subscriptionId);
        }));
    }

    private void onSubscriptionValue(MonitoredItemNotification[] values) {
        LinkedHashSet<String> tagNameList = new LinkedHashSet<String>();
        ArrayList<DataValue> dataValues = new ArrayList<DataValue>(values.length);
        MonitoredItemNotification[] monitoredItemNotificationArray = values;
        int n = values.length;
        int n2 = 0;
        while (n2 < n) {
            MonitoredItemNotification value = monitoredItemNotificationArray[n2];
            tagNameList.add(this.tagNames.get((int)value.getClientHandle() - 1));
            dataValues.add(value.getValue());
            ++n2;
        }
        Map<String, ResponseItem<PlcValue>> tags = this.plcSubscriber.readResponse(tagNameList, dataValues);
        DefaultPlcSubscriptionEvent event = new DefaultPlcSubscriptionEvent(Instant.now(), tags);
        this.consumers.forEach(plcSubscriptionEventConsumer -> plcSubscriptionEventConsumer.accept(event));
    }

    @Override
    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer) {
        LOGGER.info("Registering a new OPCUA subscription consumer");
        this.consumers.add(consumer);
        return new DefaultPlcConsumerRegistration(this.plcSubscriber, consumer, this);
    }

    public Long getSubscriptionId() {
        return this.subscriptionId;
    }
}

