/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.quickfixj;

import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.component.quickfixj.MessagePredicate;
import org.apache.camel.component.quickfixj.QuickfixjEventCategory;
import org.apache.camel.component.quickfixj.QuickfixjEventListener;
import quickfix.Message;
import quickfix.SessionID;

public class MessageCorrelator
implements QuickfixjEventListener {
    public static final long DEFAULT_CORRELATION_TIMEOUT = 1000L;
    private final List<MessageCorrelationRule> rules = new CopyOnWriteArrayList<MessageCorrelationRule>();

    public Callable<Message> getReply(Exchange exchange) {
        MessagePredicate messageCriteria = (MessagePredicate)exchange.getProperty("CorrelationCriteria");
        final MessageCorrelationRule correlationRule = new MessageCorrelationRule(exchange, messageCriteria);
        this.rules.add(correlationRule);
        final long timeout = exchange.getProperty("CorrelationTimeout", (Object)1000L, Long.class);
        return new Callable<Message>(){

            @Override
            public Message call() throws Exception {
                if (!correlationRule.getLatch().await(timeout, TimeUnit.MILLISECONDS)) {
                    throw new ExchangeTimedOutException(correlationRule.getExchange(), timeout);
                }
                return correlationRule.getReplyMessage();
            }
        };
    }

    @Override
    public void onEvent(QuickfixjEventCategory eventCategory, SessionID sessionID, Message message) throws Exception {
        if (message != null) {
            for (MessageCorrelationRule rule : this.rules) {
                if (!rule.getMessageCriteria().evaluate(message)) continue;
                rule.setReplyMessage(message);
                this.rules.remove(rule);
                rule.getLatch().countDown();
            }
        }
    }

    private static class MessageCorrelationRule {
        private final Exchange exchange;
        private final CountDownLatch latch = new CountDownLatch(1);
        private final MessagePredicate messageCriteria;
        private Message replyMessage;

        MessageCorrelationRule(Exchange exchange, MessagePredicate messageCriteria) {
            this.exchange = exchange;
            this.messageCriteria = messageCriteria;
        }

        public void setReplyMessage(Message message) {
            this.replyMessage = message;
        }

        public Message getReplyMessage() {
            return this.replyMessage;
        }

        public CountDownLatch getLatch() {
            return this.latch;
        }

        public Exchange getExchange() {
            return this.exchange;
        }

        public MessagePredicate getMessageCriteria() {
            return this.messageCriteria;
        }
    }
}

