/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.protocol;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import java.io.IOException;
import java.util.LinkedList;
import org.infinispan.client.hotrod.VersionedMetadata;
import org.infinispan.client.hotrod.impl.protocol.AbstractVersionedInputStream;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelInboundHandlerDefaults;

public class ChannelInputStream
extends AbstractVersionedInputStream
implements ChannelInboundHandlerDefaults {
    public static final String NAME = "stream";
    private final int totalLength;
    private final LinkedList<ByteBuf> bufs = new LinkedList();
    private int totalReceived;
    private int totalRead;
    private Throwable throwable;

    public ChannelInputStream(VersionedMetadata versionedMetadata, Runnable afterClose, int totalLength) {
        super(versionedMetadata, afterClose);
        this.totalLength = totalLength;
    }

    @Override
    public synchronized int read() throws IOException {
        while (true) {
            if (this.bufs.isEmpty()) {
                if (this.totalRead >= this.totalLength) {
                    return -1;
                }
                try {
                    this.wait();
                }
                catch (InterruptedException e) {
                    IOException ioException = new IOException(e);
                    if (this.throwable != null) {
                        ioException.addSuppressed(this.throwable);
                    }
                    throw ioException;
                }
                if (this.throwable == null) continue;
                throw new IOException(this.throwable);
            }
            ByteBuf buf = this.bufs.peekFirst();
            if (buf.isReadable()) {
                ++this.totalRead;
                assert (this.totalRead <= this.totalLength);
                return buf.readUnsignedByte();
            }
            buf.release();
            this.bufs.removeFirst();
        }
    }

    @Override
    public synchronized int read(byte[] b, int off, int len) throws IOException {
        int numRead = 0;
        while (true) {
            if (numRead == 0 && this.bufs.isEmpty()) {
                if (this.totalRead >= this.totalLength) {
                    return -1;
                }
                try {
                    this.wait();
                }
                catch (InterruptedException e) {
                    IOException ioException = new IOException(e);
                    if (this.throwable != null) {
                        ioException.addSuppressed(this.throwable);
                    }
                    throw ioException;
                }
                if (this.throwable == null) continue;
                throw new IOException(this.throwable);
            }
            if (this.bufs.isEmpty()) {
                return numRead;
            }
            ByteBuf buf = this.bufs.peekFirst();
            int readable = buf.readableBytes();
            if (readable > 0) {
                int prevReaderIndex = buf.readerIndex();
                buf.readBytes(b, off + numRead, Math.min(len - numRead, readable));
                int nowRead = buf.readerIndex() - prevReaderIndex;
                numRead += nowRead;
                this.totalRead += nowRead;
                assert (this.totalRead <= this.totalLength) : "Now read: " + nowRead + ", read: " + this.totalRead + ", length" + this.totalLength;
                if (numRead < len) continue;
                return numRead;
            }
            buf.release();
            this.bufs.removeFirst();
        }
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf)msg;
            if (this.totalReceived + buf.readableBytes() <= this.totalLength) {
                this.bufs.add(buf);
                this.totalReceived += buf.readableBytes();
                if (this.totalReceived == this.totalLength) {
                    ctx.pipeline().remove(this);
                }
            } else if (this.totalReceived < this.totalLength) {
                this.bufs.add(buf.retainedSlice(buf.readerIndex(), this.totalLength - this.totalReceived));
                buf.readerIndex(buf.readerIndex() + this.totalLength - this.totalReceived);
                this.totalReceived = this.totalLength;
                ctx.pipeline().remove(this);
                ctx.fireChannelRead(buf);
            } else {
                ctx.fireChannelRead(buf);
            }
            this.notifyAll();
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    @Override
    public synchronized void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        this.throwable = cause;
        this.notifyAll();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (!(evt instanceof IdleStateEvent)) {
            ctx.fireUserEventTriggered(evt);
        }
    }

    @Override
    public synchronized void close() throws IOException {
        super.close();
        for (ByteBuf buf : this.bufs) {
            buf.release();
        }
    }

    public boolean moveReadable(ByteBuf buf) {
        if (buf.isReadable()) {
            int numReceived = Math.min(this.totalLength - this.totalReceived, buf.readableBytes());
            this.bufs.add(buf.readBytes(numReceived));
            this.totalReceived += numReceived;
        }
        return this.totalReceived < this.totalLength;
    }
}

