/*
 * Decompiled with CFR 0.152.
 */
package org.cometd.server;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.cometd.bayeux.ChannelId;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.client.ClientSession;
import org.cometd.bayeux.server.LocalSession;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.common.AbstractClientSession;
import org.cometd.server.BayeuxServerImpl;
import org.cometd.server.ServerSessionImpl;

public class LocalSessionImpl
extends AbstractClientSession
implements LocalSession {
    private final Queue<ServerMessage.Mutable> _queue = new ConcurrentLinkedQueue<ServerMessage.Mutable>();
    private final BayeuxServerImpl _bayeux;
    private final String _idHint;
    private ServerSessionImpl _session;

    public LocalSessionImpl(BayeuxServerImpl bayeux, String idHint) {
        this._bayeux = bayeux;
        this._idHint = idHint;
    }

    @Override
    public void receive(Message.Mutable message, Promise<Void> promise) {
        super.receive(message, Promise.from(y -> {
            if ("/meta/disconnect".equals(message.getChannel()) && message.isSuccessful()) {
                this._session = null;
            }
            promise.succeed(null);
        }, promise::fail));
    }

    @Override
    protected AbstractClientSession.AbstractSessionChannel newChannel(ChannelId channelId) {
        return new LocalChannel(channelId);
    }

    @Override
    protected ChannelId newChannelId(String channelId) {
        return this._bayeux.newChannelId(channelId);
    }

    @Override
    protected void sendBatch() {
        int size = this._queue.size();
        while (size-- > 0) {
            ServerMessage.Mutable message = this._queue.poll();
            this.doSend(this._session, message, Promise.noop());
        }
    }

    @Override
    public ServerSession getServerSession() {
        if (this._session == null) {
            throw new IllegalStateException("Method handshake() not invoked for local session " + this);
        }
        return this._session;
    }

    @Override
    public void handshake(Map<String, Object> template, ClientSession.MessageListener callback) {
        if (this._session != null) {
            throw new IllegalStateException();
        }
        ServerSessionImpl session = new ServerSessionImpl(this._bayeux, this, this._idHint);
        ServerMessage.Mutable hsMessage = this.newMessage();
        if (template != null) {
            hsMessage.putAll(template);
        }
        String messageId = this.newMessageId();
        hsMessage.setId(messageId);
        hsMessage.setChannel("/meta/handshake");
        this.registerCallback(messageId, callback);
        this.doSend(session, hsMessage, Promise.from(hsReply -> {
            if (hsReply != null && hsReply.isSuccessful()) {
                this._session = session;
                ServerMessage.Mutable cnMessage = this.newMessage();
                cnMessage.setId(this.newMessageId());
                cnMessage.setChannel("/meta/connect");
                cnMessage.getAdvice(true).put("interval", -1L);
                cnMessage.setClientId(session.getId());
                this.doSend(session, cnMessage, Promise.from(cnReply -> {}, failure -> this.messageFailure(cnMessage, (Throwable)failure)));
            }
        }, failure -> this.messageFailure(hsMessage, (Throwable)failure)));
    }

    private void messageFailure(ServerMessage message, Throwable cause) {
        ServerMessage.Mutable failed = this.newMessage();
        failed.setId(message.getId());
        failed.setSuccessful(false);
        failed.setChannel(message.getChannel());
        if (message.containsKey("subscription")) {
            failed.put("subscription", message.get("subscription"));
        }
        HashMap<String, Object> failure = new HashMap<String, Object>();
        failed.put("failure", failure);
        failure.put("message", message);
        failure.put("exception", cause);
        this.receive(failed, Promise.noop());
    }

    @Override
    public void disconnect(ClientSession.MessageListener callback) {
        if (this._session != null) {
            ServerMessage.Mutable message = this.newMessage();
            String messageId = this.newMessageId();
            message.setId(messageId);
            message.setChannel("/meta/disconnect");
            message.setClientId(this._session.getId());
            this.registerCallback(messageId, callback);
            this.send(message);
            while (this.isBatching()) {
                this.endBatch();
            }
        }
    }

    @Override
    public String getId() {
        if (this._session == null) {
            throw new IllegalStateException("Method handshake() not invoked for local session " + this);
        }
        return this._session.getId();
    }

    @Override
    public boolean isConnected() {
        return this._session != null && this._session.isConnected();
    }

    @Override
    public boolean isHandshook() {
        return this._session != null && this._session.isHandshook();
    }

    public String toString() {
        return "L:" + (this._session == null ? this._idHint + "_<disconnected>" : this._session.getId());
    }

    @Override
    protected void send(Message.Mutable message) {
        if (message instanceof ServerMessage.Mutable) {
            this.send(this._session, (ServerMessage.Mutable)message);
        } else {
            ServerMessage.Mutable mutable = this.newMessage();
            mutable.putAll(message);
            this.send(this._session, mutable);
        }
    }

    protected void send(ServerSessionImpl session, ServerMessage.Mutable message) {
        if (this.isBatching()) {
            this._queue.add(message);
        } else {
            this.doSend(session, message, Promise.noop());
        }
    }

    private void doSend(ServerSessionImpl session, ServerMessage.Mutable message, Promise<ServerMessage.Mutable> promise) {
        String messageId = message.getId();
        message.setClientId(session.getId());
        this.extendOutgoing(message, Promise.from(result -> {
            message.setId(messageId);
            if (result.booleanValue()) {
                this._bayeux.handle(session, message, Promise.from(r -> this._bayeux.extendReply(session, this._session, (ServerMessage.Mutable)r, Promise.from(reply -> {
                    if (reply != null) {
                        this.receive((Message.Mutable)reply, Promise.from(y -> promise.succeed((ServerMessage.Mutable)reply), promise::fail));
                    } else {
                        promise.succeed(null);
                    }
                }, promise::fail)), promise::fail));
            } else {
                ServerMessage.Mutable reply = this._bayeux.createReply(message);
                this._bayeux.error(reply, "404::message_deleted");
                this.receive(reply, Promise.from(y -> promise.succeed(reply), promise::fail));
            }
        }, promise::fail));
    }

    @Override
    protected ServerMessage.Mutable newMessage() {
        return this._bayeux.newMessage();
    }

    protected class LocalChannel
    extends AbstractClientSession.AbstractSessionChannel {
        protected LocalChannel(ChannelId channelId) {
            super(LocalSessionImpl.this, channelId);
        }

        @Override
        public ClientSession getSession() {
            this.throwIfReleased();
            return LocalSessionImpl.this;
        }
    }
}

