/*
 * Decompiled with CFR 0.152.
 */
package org.apache.plc4x.java.s7.readwrite.protocol;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
import org.apache.plc4x.java.api.model.PlcConsumerRegistration;
import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
import org.apache.plc4x.java.s7.events.S7AlarmEvent;
import org.apache.plc4x.java.s7.events.S7CyclicEvent;
import org.apache.plc4x.java.s7.events.S7Event;
import org.apache.plc4x.java.s7.events.S7ModeEvent;
import org.apache.plc4x.java.s7.events.S7SysEvent;
import org.apache.plc4x.java.s7.events.S7UserEvent;
import org.apache.plc4x.java.s7.readwrite.EventType;
import org.apache.plc4x.java.s7.readwrite.utils.S7PlcSubscriptionHandle;
import org.apache.plc4x.java.spi.messages.PlcSubscriber;
import org.apache.plc4x.java.spi.model.DefaultPlcConsumerRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S7ProtocolEventLogic
implements PlcSubscriber {
    private final Logger logger = LoggerFactory.getLogger(S7ProtocolEventLogic.class);
    private static final int DEFAULT_DELAY = 100;
    private final Map<EventType, Map<PlcConsumerRegistration, Consumer<PlcSubscriptionEvent>>> mapIndex = new HashMap<EventType, Map<PlcConsumerRegistration, Consumer<PlcSubscriptionEvent>>>();
    private final ObjectProcessor runProcessor;
    private final EventDispatcher runDispatcher;
    private final Thread processor;
    private final Thread dispatcher;

    public S7ProtocolEventLogic(BlockingQueue<S7Event> eventQueue) {
        ArrayBlockingQueue<S7Event> dispatchQueue = new ArrayBlockingQueue<S7Event>(1024);
        this.runProcessor = new ObjectProcessor(eventQueue, dispatchQueue);
        this.runDispatcher = new EventDispatcher(dispatchQueue);
        this.processor = new BasicThreadFactory.Builder().namingPattern("plc4x-evt-processor-thread-%d").daemon(true).priority(10).build().newThread(this.runProcessor);
        this.dispatcher = new BasicThreadFactory.Builder().namingPattern("plc4x-evt-dispatcher-thread-%d").daemon(true).priority(10).build().newThread(this.runDispatcher);
    }

    public void start() {
        this.processor.start();
        this.dispatcher.start();
    }

    public void stop() {
        this.runProcessor.doShutdown();
        this.runDispatcher.doShutdown();
    }

    @Override
    public CompletableFuture<PlcSubscriptionResponse> subscribe(PlcSubscriptionRequest subscriptionRequest) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public CompletableFuture<PlcUnsubscriptionResponse> unsubscribe(PlcUnsubscriptionRequest unsubscriptionRequest) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override
    public PlcConsumerRegistration register(Consumer<PlcSubscriptionEvent> consumer, Collection<PlcSubscriptionHandle> handles) {
        Map<Object, Object> mapConsumers;
        S7PlcSubscriptionHandle handle = (S7PlcSubscriptionHandle)handles.toArray()[0];
        if (!this.mapIndex.containsKey((Object)handle.getEventType())) {
            mapConsumers = new HashMap();
            this.mapIndex.put(handle.getEventType(), mapConsumers);
        }
        mapConsumers = this.mapIndex.get((Object)handle.getEventType());
        DefaultPlcConsumerRegistration registration = new DefaultPlcConsumerRegistration(this, consumer, handles.toArray(new PlcSubscriptionHandle[0]));
        mapConsumers.put(registration, consumer);
        return registration;
    }

    @Override
    public void unregister(PlcConsumerRegistration registration) {
        S7PlcSubscriptionHandle handle = (S7PlcSubscriptionHandle)registration.getSubscriptionHandles().get(0);
        Map<PlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> mapConsumers = this.mapIndex.get((Object)handle.getEventType());
        mapConsumers.remove(registration);
    }

    private class EventDispatcher
    implements Runnable {
        private final BlockingQueue<S7Event> dispatchQueue;
        private boolean shutdown = false;
        private S7Event cycDelayedObject = null;

        public EventDispatcher(BlockingQueue<S7Event> dispatchQueue) {
            this.dispatchQueue = dispatchQueue;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(500L);
            }
            catch (InterruptedException ex) {
                S7ProtocolEventLogic.this.logger.warn(ex.toString());
            }
            while (!this.shutdown) {
                try {
                    Map<PlcConsumerRegistration, Consumer<PlcSubscriptionEvent>> mapConsumers;
                    S7Event s7Event = this.dispatchQueue.poll(100L, TimeUnit.MILLISECONDS);
                    if (s7Event == null) continue;
                    if (s7Event instanceof S7ModeEvent) {
                        S7ModeEvent modeEvent = (S7ModeEvent)s7Event;
                        if (!S7ProtocolEventLogic.this.mapIndex.containsKey((Object)EventType.MODE)) continue;
                        mapConsumers = S7ProtocolEventLogic.this.mapIndex.get((Object)EventType.MODE);
                        mapConsumers.forEach((x, y) -> y.accept(modeEvent));
                        continue;
                    }
                    if (s7Event instanceof S7UserEvent) {
                        S7UserEvent userEvent = (S7UserEvent)s7Event;
                        if (!S7ProtocolEventLogic.this.mapIndex.containsKey((Object)EventType.USR)) continue;
                        mapConsumers = S7ProtocolEventLogic.this.mapIndex.get((Object)EventType.USR);
                        mapConsumers.forEach((x, y) -> y.accept(userEvent));
                        continue;
                    }
                    if (s7Event instanceof S7SysEvent) {
                        S7SysEvent sysEvent = (S7SysEvent)s7Event;
                        if (!S7ProtocolEventLogic.this.mapIndex.containsKey((Object)EventType.SYS)) continue;
                        mapConsumers = S7ProtocolEventLogic.this.mapIndex.get((Object)EventType.SYS);
                        mapConsumers.forEach((x, y) -> y.accept(sysEvent));
                        continue;
                    }
                    if (s7Event instanceof S7AlarmEvent) {
                        S7AlarmEvent alarmEvent = (S7AlarmEvent)s7Event;
                        if (!S7ProtocolEventLogic.this.mapIndex.containsKey((Object)EventType.ALM)) continue;
                        mapConsumers = S7ProtocolEventLogic.this.mapIndex.get((Object)EventType.ALM);
                        mapConsumers.forEach((x, y) -> y.accept(alarmEvent));
                        continue;
                    }
                    if (!(s7Event instanceof S7CyclicEvent)) continue;
                    S7CyclicEvent cyclicEvent = (S7CyclicEvent)s7Event;
                    if (S7ProtocolEventLogic.this.mapIndex.containsKey((Object)EventType.CYC)) {
                        mapConsumers = S7ProtocolEventLogic.this.mapIndex.get((Object)EventType.CYC);
                        if (this.cycDelayedObject != null) {
                            mapConsumers.forEach((x, y) -> y.accept(this.cycDelayedObject));
                            this.cycDelayedObject = null;
                        }
                        mapConsumers.forEach((x, y) -> {
                            S7PlcSubscriptionHandle sh = (S7PlcSubscriptionHandle)x.getSubscriptionHandles().get(0);
                            Short id = Short.parseShort(sh.getEventId());
                            if (cyclicEvent.getMap().get("JOBID") == id) {
                                y.accept(cyclicEvent);
                            }
                        });
                        continue;
                    }
                    this.cycDelayedObject = s7Event;
                }
                catch (Exception ex) {
                    java.util.logging.Logger.getLogger(S7ProtocolEventLogic.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
            S7ProtocolEventLogic.this.logger.info("EventDispatcher Bye!");
        }

        public void doShutdown() {
            this.shutdown = true;
        }
    }

    private class ObjectProcessor
    implements Runnable {
        private final BlockingQueue<S7Event> eventQueue;
        private final BlockingQueue<S7Event> dispatchQueue;
        private boolean shutdown = false;

        public ObjectProcessor(BlockingQueue<S7Event> eventQueue, BlockingQueue<S7Event> dispatchQueue) {
            this.eventQueue = eventQueue;
            this.dispatchQueue = dispatchQueue;
        }

        @Override
        public void run() {
            while (!this.shutdown) {
                try {
                    S7Event s7Event = this.eventQueue.poll(100L, TimeUnit.MILLISECONDS);
                    if (s7Event == null || this.dispatchQueue.remainingCapacity() <= 1) continue;
                    if (s7Event instanceof S7ModeEvent) {
                        this.dispatchQueue.add(s7Event);
                        continue;
                    }
                    if (s7Event instanceof S7UserEvent) {
                        this.dispatchQueue.add(s7Event);
                        continue;
                    }
                    if (s7Event instanceof S7SysEvent) {
                        this.dispatchQueue.add(s7Event);
                        continue;
                    }
                    if (s7Event instanceof S7CyclicEvent) {
                        this.dispatchQueue.add(s7Event);
                        continue;
                    }
                    this.dispatchQueue.add(s7Event);
                }
                catch (InterruptedException ex) {
                    java.util.logging.Logger.getLogger(S7ProtocolEventLogic.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
            S7ProtocolEventLogic.this.logger.info("ObjectProcessor Bye!");
        }

        public void doShutdown() {
            this.shutdown = true;
        }
    }
}

