/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.neoscada.protocol.iec60870.server.data;

import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.neoscada.protocol.iec60870.apci.MessageSource;
import org.eclipse.neoscada.protocol.iec60870.asdu.ASDUHeader;
import org.eclipse.neoscada.protocol.iec60870.asdu.message.InterrogationCommand;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.ASDUAddress;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.Cause;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.CauseOfTransmission;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.Causes;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.InformationEntry;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.InformationObjectAddress;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.StandardCause;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.Value;
import org.eclipse.neoscada.protocol.iec60870.server.data.BackgroundIterator;
import org.eclipse.neoscada.protocol.iec60870.server.data.ChannelWriter;
import org.eclipse.neoscada.protocol.iec60870.server.data.DataListenerImpl;
import org.eclipse.neoscada.protocol.iec60870.server.data.DataModel;
import org.eclipse.neoscada.protocol.iec60870.server.data.DataModuleOptions;
import org.eclipse.neoscada.protocol.iec60870.server.data.FinallyFutureCallback;
import org.eclipse.neoscada.protocol.iec60870.server.data.event.EventQueue;
import org.eclipse.neoscada.protocol.iec60870.server.data.event.SimpleBooleanBuilder;
import org.eclipse.neoscada.protocol.iec60870.server.data.event.SimpleFloatBuilder;
import org.eclipse.neoscada.protocol.iec60870.server.data.event.SimpleScaledBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataModuleMessageSource
implements MessageSource {
    private static final Logger logger = LoggerFactory.getLogger(DataModuleMessageSource.class);
    private final Map<InterrogationRequest, InterrogationInstance> currentInterrogation = new HashMap<InterrogationRequest, InterrogationInstance>();
    private final EventQueue<Boolean> booleanEventBuffer;
    private final EventQueue<Float> floatEventBuffer;
    private final EventQueue<Short> shortEventBuffer;
    private final List<EventQueue<?>> buffers = new ArrayList();
    private final Queue<Object> messages = new LinkedList<Object>();
    private int bufferIndex;
    private final ChannelWriter writer;
    private final DataModel model;
    private BackgroundIterator backgroundIterator;
    private long lastBackgroundStart;
    private final long backgroundScanPeriod;
    private final ScheduledExecutorService executor;

    public DataModuleMessageSource(DataModuleOptions options, ScheduledExecutorService executor, ChannelWriter writer, DataModel model, long backgroundScanPeriod) {
        this.executor = executor;
        this.writer = writer;
        this.model = model;
        this.backgroundScanPeriod = backgroundScanPeriod;
        this.booleanEventBuffer = new EventQueue<Boolean>(options.getSpontaneousDuplicates(), new SimpleBooleanBuilder(options.isBooleansWithTimestamp()));
        this.floatEventBuffer = new EventQueue<Float>(options.getSpontaneousDuplicates(), new SimpleFloatBuilder(options.isFloatsWithTimestamp()));
        this.shortEventBuffer = new EventQueue<Short>(options.getSpontaneousDuplicates(), new SimpleScaledBuilder(options.isFloatsWithTimestamp()));
        this.createBackgroundScan();
        this.buffers.add(this.booleanEventBuffer);
        this.buffers.add(this.floatEventBuffer);
        this.buffers.add(this.shortEventBuffer);
    }

    @Override
    public synchronized Object poll() {
        Object msg = this.messages.poll();
        if (msg != null) {
            return msg;
        }
        for (Map.Entry<InterrogationRequest, InterrogationInstance> entry : this.currentInterrogation.entrySet()) {
            InterrogationInstance currentInterrogation = entry.getValue();
            if (currentInterrogation.state != InterrogationState.FLUSHING || this.hasInterrogationValues(currentInterrogation)) continue;
            this.currentInterrogation.remove(entry.getKey());
            InterrogationCommand reply = DataModuleMessageSource.createCurrentInterrogationReply(entry.getValue(), StandardCause.ACTIVATION_TERMINATION);
            return reply;
        }
        msg = this.pollFromBuffers();
        if (msg != null) {
            return msg;
        }
        msg = this.pollFromBackground();
        if (msg != null) {
            return msg;
        }
        return null;
    }

    private Object pollFromBuffers() {
        int startIndex = this.bufferIndex;
        do {
            Object msg;
            EventQueue<?> buffer = this.buffers.get(this.bufferIndex);
            ++this.bufferIndex;
            if (this.bufferIndex >= this.buffers.size()) {
                this.bufferIndex = 0;
            }
            if ((msg = buffer.poll()) == null) continue;
            return msg;
        } while (startIndex != this.bufferIndex);
        return null;
    }

    private void createBackgroundScan() {
        logger.info("Start new background scan - grace period: {}", (Object)this.backgroundScanPeriod);
        if (this.backgroundScanPeriod > 0L && this.model != null && this.executor != null) {
            this.backgroundIterator = this.model.createBackgroundIterator();
            this.lastBackgroundStart = System.currentTimeMillis();
            this.executor.schedule(new Runnable(){

                @Override
                public void run() {
                    logger.info("Background scan grace period expired");
                    DataModuleMessageSource.this.writer.notifyMoreData();
                }
            }, this.backgroundScanPeriod, TimeUnit.MILLISECONDS);
        } else {
            this.backgroundIterator = null;
        }
    }

    private Object pollFromBackground() {
        if (this.backgroundIterator == null) {
            return null;
        }
        long diff = System.currentTimeMillis() - this.lastBackgroundStart;
        logger.trace("pollFromBackground - timeToLast: {}, period: {}", (Object)diff, (Object)this.backgroundScanPeriod);
        if (diff < this.backgroundScanPeriod) {
            return null;
        }
        Object msg = this.backgroundIterator.nextMessage();
        if (msg != null) {
            return msg;
        }
        this.createBackgroundScan();
        return null;
    }

    private static InterrogationCommand createCurrentInterrogationReply(InterrogationInstance currentInterrogation, Cause cause) {
        return new InterrogationCommand(new ASDUHeader(new CauseOfTransmission(cause, currentInterrogation.expectedCauseOfTransmission.getSourceAddress()), currentInterrogation.asduAddress), currentInterrogation.qualifierOfInterrogation);
    }

    private boolean hasInterrogationValues(InterrogationInstance instance) {
        if (instance == null) {
            return false;
        }
        for (EventQueue<?> buffer : this.buffers) {
            if (buffer.getCauseCounter(instance.expectedCauseOfTransmission, instance.asduAddress) <= 0) continue;
            return true;
        }
        return false;
    }

    public synchronized void sendBooleanValue(ASDUHeader header, InformationObjectAddress address, Value<Boolean> value) {
        this.recordCause(header.getAsduAddress(), header.getCauseOfTransmission());
        this.booleanEventBuffer.append(header.getCauseOfTransmission(), header.getAsduAddress(), address, value);
        this.writer.notifyMoreData();
    }

    public synchronized void sendBooleanValues(ASDUHeader header, InformationObjectAddress startAddress, List<Value<Boolean>> values) {
        this.recordCause(header.getAsduAddress(), header.getCauseOfTransmission());
        this.booleanEventBuffer.append(header.getCauseOfTransmission(), header.getAsduAddress(), startAddress, values);
        this.writer.notifyMoreData();
    }

    public synchronized void sendBooleanValues(ASDUHeader header, List<InformationEntry<Boolean>> values) {
        this.recordCause(header.getAsduAddress(), header.getCauseOfTransmission());
        this.booleanEventBuffer.append(header.getCauseOfTransmission(), header.getAsduAddress(), values);
        this.writer.notifyMoreData();
    }

    public synchronized void sendFloatValue(ASDUHeader header, InformationObjectAddress address, Value<Float> value) {
        this.recordCause(header.getAsduAddress(), header.getCauseOfTransmission());
        this.floatEventBuffer.append(header.getCauseOfTransmission(), header.getAsduAddress(), address, value);
        this.writer.notifyMoreData();
    }

    public synchronized void sendFloatValues(ASDUHeader header, InformationObjectAddress startAddress, List<Value<Float>> values) {
        this.recordCause(header.getAsduAddress(), header.getCauseOfTransmission());
        this.floatEventBuffer.append(header.getCauseOfTransmission(), header.getAsduAddress(), startAddress, values);
        this.writer.notifyMoreData();
    }

    public synchronized void sendFloatValues(ASDUHeader header, List<InformationEntry<Float>> values) {
        this.recordCause(header.getAsduAddress(), header.getCauseOfTransmission());
        this.floatEventBuffer.append(header.getCauseOfTransmission(), header.getAsduAddress(), values);
        this.writer.notifyMoreData();
    }

    public synchronized void sendShortValue(ASDUHeader header, InformationObjectAddress address, Value<Short> value) {
        this.recordCause(header.getAsduAddress(), header.getCauseOfTransmission());
        this.shortEventBuffer.append(header.getCauseOfTransmission(), header.getAsduAddress(), address, value);
        this.writer.notifyMoreData();
    }

    public synchronized void sendShortValues(ASDUHeader header, InformationObjectAddress startAddress, List<Value<Short>> values) {
        this.recordCause(header.getAsduAddress(), header.getCauseOfTransmission());
        this.shortEventBuffer.append(header.getCauseOfTransmission(), header.getAsduAddress(), startAddress, values);
        this.writer.notifyMoreData();
    }

    public synchronized void sendShortValues(ASDUHeader header, List<InformationEntry<Short>> values) {
        this.recordCause(header.getAsduAddress(), header.getCauseOfTransmission());
        this.shortEventBuffer.append(header.getCauseOfTransmission(), header.getAsduAddress(), values);
        this.writer.notifyMoreData();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startInterrogation(final InterrogationCommand msg) {
        short qualifierOfInterrogation = msg.getQualifierOfInterrogation();
        final Cause cause = DataModuleMessageSource.convert(qualifierOfInterrogation);
        if (cause == null) {
            logger.info("Ignoring unknown qualifier of interrogation (QOI) : {}", (Object)qualifierOfInterrogation);
            this.writer.write(msg.mirror(StandardCause.UNKNOWN_REASON));
            this.writer.flush();
            return;
        }
        DataModuleMessageSource dataModuleMessageSource = this;
        synchronized (dataModuleMessageSource) {
            logger.debug("Starting interrogation");
            if (msg.getHeader().getAsduAddress().isBroadcast()) {
                logger.debug("Broadcast interrogation");
                this.model.forAllAsdu(new Function<ASDUAddress, Void>(){

                    @Override
                    public Void apply(ASDUAddress asduAddress) {
                        logger.debug("Broadcast member: {}", (Object)asduAddress);
                        InterrogationRequest request = new InterrogationRequest(asduAddress, cause, msg.getQualifierOfInterrogation(), msg.getHeader().getCauseOfTransmission().getSourceAddress());
                        DataModuleMessageSource.this.tryStartInterrogation(msg, request);
                        return null;
                    }
                }, new Runnable(){

                    @Override
                    public void run() {
                        logger.info("No ASDU common addresses registered");
                        DataModuleMessageSource.this.writer.write(msg.mirror(StandardCause.UNKNOWN_ASDU_ADDRESS));
                        DataModuleMessageSource.this.writer.flush();
                    }
                });
            } else {
                logger.debug("Interrogation for: {}", (Object)msg.getHeader().getAsduAddress());
                InterrogationRequest request = new InterrogationRequest(msg.getHeader().getAsduAddress(), cause, msg.getQualifierOfInterrogation(), msg.getHeader().getCauseOfTransmission().getSourceAddress());
                this.tryStartInterrogation(msg, request);
            }
        }
    }

    private void tryStartInterrogation(InterrogationCommand msg, InterrogationRequest request) {
        logger.debug("tryStartInterrogation - msg: {}, request: {}", (Object)msg, (Object)request);
        if (!this.currentInterrogation.containsKey(request)) {
            this.createNewInterrogation(msg, request);
        } else {
            logger.warn("GA already running: {}", (Object)request);
        }
    }

    private void createNewInterrogation(InterrogationCommand msg, final InterrogationRequest request) {
        logger.debug("Create new interrogation - msg: {}, request: {}", (Object)msg, (Object)request);
        final InterrogationInstance currentInterrogation = new InterrogationInstance();
        currentInterrogation.state = InterrogationState.WAITING;
        currentInterrogation.asduAddress = request.asduAddress;
        currentInterrogation.qualifierOfInterrogation = request.qualifierOfInterrogation;
        currentInterrogation.expectedCauseOfTransmission = new CauseOfTransmission(DataModuleMessageSource.convert(request.qualifierOfInterrogation), request.sourceAddress);
        ListenableFuture<Void> future = this.model.readAll(request.asduAddress, new Runnable(){

            @Override
            public void run() {
                logger.info("Prepare interrogation");
                try {
                    DataModuleMessageSource.this.executor.submit(new Runnable(){

                        @Override
                        public void run() {
                            DataModuleMessageSource.this.writer.write(DataModuleMessageSource.createCurrentInterrogationReply(currentInterrogation, StandardCause.ACTIVATION_CONFIRM));
                            DataModuleMessageSource.this.writer.flush();
                        }
                    }).get();
                }
                catch (Exception e) {
                    logger.warn("Failed to send out interrogation confirm", (Throwable)e);
                }
                logger.debug("Flushed");
            }
        }, new DataListenerImpl(this, currentInterrogation.expectedCauseOfTransmission));
        if (future == null) {
            logger.info("Failed to start interrogation");
            this.writer.write(msg.mirror(StandardCause.UNKNOWN_ASDU_ADDRESS));
            this.writer.flush();
            return;
        }
        this.currentInterrogation.put(request, currentInterrogation);
        Futures.addCallback(future, new FinallyFutureCallback<Void>(){

            @Override
            public void onFinally() {
                logger.info("Finished interrogation");
                DataModuleMessageSource.this.completeInterrogation(request);
            }
        }, this.executor);
    }

    private static Cause convert(short qualifierOfInterrogation) {
        if (qualifierOfInterrogation == 20) {
            return StandardCause.STATION_REQUEST;
        }
        if (qualifierOfInterrogation > 20 && qualifierOfInterrogation <= 36) {
            return Causes.valueOf(qualifierOfInterrogation);
        }
        return null;
    }

    public synchronized void completeInterrogation(InterrogationRequest request) {
        logger.info("Complete interrogation: {}", (Object)request);
        InterrogationInstance currentInterrogation = this.currentInterrogation.get(request);
        if (currentInterrogation == null) {
            return;
        }
        if (currentInterrogation.state == InterrogationState.WAITING) {
            logger.info("No data. Early finish.");
            this.writer.write(DataModuleMessageSource.createCurrentInterrogationReply(currentInterrogation, StandardCause.ACTIVATION_TERMINATION));
            this.writer.flush();
            this.currentInterrogation.remove(request);
        } else if (currentInterrogation.state == InterrogationState.RUNNING) {
            logger.info("Has data. Lazy finish.");
            currentInterrogation.state = InterrogationState.FLUSHING;
            this.writer.notifyMoreData();
        }
    }

    private void recordCause(ASDUAddress asduAddress, CauseOfTransmission causeOfTransmission) {
        logger.debug("Record cause: {}, {}", (Object)asduAddress, (Object)causeOfTransmission);
        if (!Causes.isInterrogation(causeOfTransmission.getCause())) {
            return;
        }
        InterrogationRequest request = new InterrogationRequest(asduAddress, causeOfTransmission.getCause(), 0, causeOfTransmission.getSourceAddress());
        InterrogationInstance currentInterrogation = this.currentInterrogation.get(request);
        logger.debug("Instance: {}", (Object)currentInterrogation);
        if (currentInterrogation != null && currentInterrogation.state == InterrogationState.WAITING) {
            logger.debug("Check cause");
            if (causeOfTransmission.equals(currentInterrogation.expectedCauseOfTransmission)) {
                logger.debug("Set request to RUNNING: {}", (Object)currentInterrogation);
                currentInterrogation.state = InterrogationState.RUNNING;
            }
        }
    }

    public static class InterrogationException
    extends Exception {
        private static final long serialVersionUID = 1L;
        private final Cause errorCause;

        public InterrogationException(String message, Cause errorCause) {
            super(message);
            this.errorCause = errorCause;
        }

        public Cause getErrorCause() {
            return this.errorCause;
        }
    }

    private static class InterrogationInstance {
        private InterrogationState state;
        private ASDUAddress asduAddress;
        private short qualifierOfInterrogation;
        private CauseOfTransmission expectedCauseOfTransmission;

        private InterrogationInstance() {
        }

        public String toString() {
            return String.format("[ASDU: %s, QOI: %s, Cause: %s -> %s]", new Object[]{this.asduAddress, this.qualifierOfInterrogation, this.expectedCauseOfTransmission, this.state});
        }
    }

    private static class InterrogationRequest {
        private final ASDUAddress asduAddress;
        private final byte sourceAddress;
        private final short causeValue;
        private final short qualifierOfInterrogation;

        public InterrogationRequest(ASDUAddress asduAddress, Cause cause, short qualifierOfInterrogation, byte sourceAddress) {
            this.asduAddress = asduAddress;
            this.causeValue = cause.getValue();
            this.qualifierOfInterrogation = qualifierOfInterrogation;
            this.sourceAddress = sourceAddress;
        }

        public String toString() {
            return String.format("[%s - %s - %02x]", this.asduAddress, this.qualifierOfInterrogation, this.sourceAddress);
        }

        public int hashCode() {
            int result = 1;
            result = 31 * result + (this.asduAddress == null ? 0 : this.asduAddress.hashCode());
            result = 31 * result + this.causeValue;
            result = 31 * result + this.sourceAddress;
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            InterrogationRequest other = (InterrogationRequest)obj;
            if (this.asduAddress == null ? other.asduAddress != null : !this.asduAddress.equals(other.asduAddress)) {
                return false;
            }
            if (this.causeValue != other.causeValue) {
                return false;
            }
            return this.sourceAddress == other.sourceAddress;
        }
    }

    private static enum InterrogationState {
        WAITING,
        RUNNING,
        FLUSHING;

    }
}

