package quickfix.mina;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import quickfix.LogUtil;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;

/* loaded from: input_file:quickfix/mina/ThreadPerSessionEventHandlingStrategy.class */
public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy {
    private final ConcurrentMap<SessionID, MessageDispatchingThread> dispatchers = new ConcurrentHashMap();
    private final SessionConnector sessionConnector;
    private final int queueCapacity;
    private volatile Executor executor;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:quickfix/mina/ThreadPerSessionEventHandlingStrategy$MessageDispatchingThread.class */
    public class MessageDispatchingThread extends ThreadAdapter {
        private final Session quickfixSession;
        private final BlockingQueue<Message> messages;
        private volatile boolean stopped;
        private volatile boolean stopping;

        private MessageDispatchingThread(Session session, int i, Executor executor) {
            super("QF/J Session dispatcher: " + session.getSessionID(), executor);
            this.quickfixSession = session;
            this.messages = new LinkedBlockingQueue(i);
        }

        public void enqueue(Message message) {
            if (message == EventHandlingStrategy.END_OF_STREAM && this.stopping) {
                return;
            }
            try {
                this.messages.put(message);
            } catch (InterruptedException e) {
                this.quickfixSession.getLog().onErrorEvent(e.toString());
            }
        }

        public int getQueueSize() {
            return this.messages.size();
        }

        @Override // quickfix.mina.ThreadPerSessionEventHandlingStrategy.ThreadAdapter
        void doRun() {
            while (!this.stopping) {
                try {
                    Message nextMessage = ThreadPerSessionEventHandlingStrategy.this.getNextMessage(this.messages);
                    if (nextMessage != null) {
                        this.quickfixSession.next(nextMessage);
                        if (nextMessage == EventHandlingStrategy.END_OF_STREAM) {
                            this.stopping = true;
                        }
                    }
                } catch (InterruptedException e) {
                    LogUtil.logThrowable(this.quickfixSession.getSessionID(), "Message dispatcher interrupted", e);
                    this.stopping = true;
                } catch (Throwable th) {
                    LogUtil.logThrowable(this.quickfixSession.getSessionID(), "Error during message processing", th);
                }
            }
            if (!this.messages.isEmpty()) {
                ArrayList arrayList = new ArrayList();
                this.messages.drainTo(arrayList);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    try {
                        this.quickfixSession.next((Message) it.next());
                    } catch (Throwable th2) {
                        LogUtil.logThrowable(this.quickfixSession.getSessionID(), "Error during message processing", th2);
                    }
                }
            }
            ThreadPerSessionEventHandlingStrategy.this.dispatchers.remove(this.quickfixSession.getSessionID());
            this.stopped = true;
        }

        public void stopDispatcher() {
            enqueue(EventHandlingStrategy.END_OF_STREAM);
            this.stopping = true;
            this.stopped = true;
        }

        public boolean isStopped() {
            return this.stopped;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:quickfix/mina/ThreadPerSessionEventHandlingStrategy$ThreadAdapter.class */
    public static abstract class ThreadAdapter implements Runnable {
        private final Executor executor;
        private final String name;

        /* loaded from: input_file:quickfix/mina/ThreadPerSessionEventHandlingStrategy$ThreadAdapter$DedicatedThreadExecutor.class */
        static final class DedicatedThreadExecutor implements Executor {
            private final String name;

            DedicatedThreadExecutor(String str) {
                this.name = str;
            }

            @Override // java.util.concurrent.Executor
            public void execute(Runnable runnable) {
                new Thread(runnable, this.name).start();
            }
        }

        public ThreadAdapter(String str, Executor executor) {
            this.name = str;
            this.executor = executor != null ? executor : new DedicatedThreadExecutor(str);
        }

        public void start() {
            this.executor.execute(this);
        }

        @Override // java.lang.Runnable
        public final void run() {
            Thread currentThread = Thread.currentThread();
            String name = currentThread.getName();
            try {
                if (!this.name.equals(name)) {
                    currentThread.setName(this.name + " (" + name + ")");
                }
                doRun();
            } finally {
                currentThread.setName(name);
            }
        }

        abstract void doRun();
    }

    public ThreadPerSessionEventHandlingStrategy(SessionConnector sessionConnector, int i) {
        this.sessionConnector = sessionConnector;
        this.queueCapacity = i;
    }

    public void setExecutor(Executor executor) {
        this.executor = executor;
    }

    @Override // quickfix.mina.EventHandlingStrategy
    public void onMessage(Session session, Message message) {
        MessageDispatchingThread messageDispatchingThread = this.dispatchers.get(session.getSessionID());
        if (messageDispatchingThread == null) {
            messageDispatchingThread = this.dispatchers.computeIfAbsent(session.getSessionID(), sessionID -> {
                MessageDispatchingThread messageDispatchingThread2 = new MessageDispatchingThread(session, this.queueCapacity, this.executor);
                startDispatcherThread(messageDispatchingThread2);
                return messageDispatchingThread2;
            });
        }
        if (message != null) {
            messageDispatchingThread.enqueue(message);
        }
    }

    @Override // quickfix.mina.EventHandlingStrategy
    public SessionConnector getSessionConnector() {
        return this.sessionConnector;
    }

    protected void startDispatcherThread(MessageDispatchingThread messageDispatchingThread) {
        messageDispatchingThread.start();
    }

    public void stopDispatcherThreads() {
        Collection<MessageDispatchingThread> values = this.dispatchers.values();
        Iterator<MessageDispatchingThread> it = values.iterator();
        while (it.hasNext()) {
            it.next().stopDispatcher();
        }
        while (!values.isEmpty()) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            values.removeIf((v0) -> {
                return v0.isStopped();
            });
        }
    }

    protected MessageDispatchingThread getDispatcher(SessionID sessionID) {
        return this.dispatchers.get(sessionID);
    }

    protected Message getNextMessage(BlockingQueue<Message> blockingQueue) throws InterruptedException {
        return blockingQueue.poll(250L, TimeUnit.MILLISECONDS);
    }

    @Override // quickfix.mina.EventHandlingStrategy
    public int getQueueSize() {
        int i = 0;
        Iterator<MessageDispatchingThread> it = this.dispatchers.values().iterator();
        while (it.hasNext()) {
            i += it.next().getQueueSize();
        }
        return i;
    }

    @Override // quickfix.mina.EventHandlingStrategy
    public int getQueueSize(SessionID sessionID) {
        MessageDispatchingThread messageDispatchingThread = this.dispatchers.get(sessionID);
        if (messageDispatchingThread != null) {
            return messageDispatchingThread.getQueueSize();
        }
        return 0;
    }
}
