package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyMessageDecoder;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegate.class */
public class NettyMessageClientDecoderDelegate extends ChannelInboundHandlerAdapter {
    private final NettyMessageDecoder bufferResponseDecoder;
    private ByteBuf frameHeaderBuffer;
    private NettyMessageDecoder currentDecoder;
    private final Logger LOG = LoggerFactory.getLogger(NettyMessageClientDecoderDelegate.class);
    private final NettyMessageDecoder nonBufferResponseDecoder = new NonBufferResponseDecoder();

    /* JADX INFO: Access modifiers changed from: package-private */
    public NettyMessageClientDecoderDelegate(NetworkClientHandler networkClientHandler) {
        this.bufferResponseDecoder = new BufferResponseDecoder(new NetworkBufferAllocator((NetworkClientHandler) Preconditions.checkNotNull(networkClientHandler)));
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.bufferResponseDecoder.onChannelActive(channelHandlerContext);
        this.nonBufferResponseDecoder.onChannelActive(channelHandlerContext);
        this.frameHeaderBuffer = channelHandlerContext.alloc().directBuffer(9);
        super.channelActive(channelHandlerContext);
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        IOUtils.cleanup(this.LOG, this.bufferResponseDecoder, this.nonBufferResponseDecoder);
        this.frameHeaderBuffer.release();
        super.channelInactive(channelHandlerContext);
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!(obj instanceof ByteBuf)) {
            channelHandlerContext.fireChannelRead(obj);
            return;
        }
        ByteBuf byteBuf = (ByteBuf) obj;
        while (byteBuf.isReadable()) {
            try {
                if (this.currentDecoder != null) {
                    NettyMessageDecoder.DecodingResult onChannelRead = this.currentDecoder.onChannelRead(byteBuf);
                    if (!onChannelRead.isFinished()) {
                        break;
                    }
                    channelHandlerContext.fireChannelRead((Object) onChannelRead.getMessage());
                    this.currentDecoder = null;
                    this.frameHeaderBuffer.clear();
                }
                decodeFrameHeader(byteBuf);
            } catch (Throwable th) {
                byteBuf.release();
                throw th;
            }
        }
        Preconditions.checkState(!byteBuf.isReadable(), "Not all data of the received buffer consumed.");
        byteBuf.release();
    }

    private void decodeFrameHeader(ByteBuf byteBuf) {
        ByteBuf accumulate = ByteBufUtils.accumulate(this.frameHeaderBuffer, byteBuf, 9, this.frameHeaderBuffer.readableBytes());
        if (accumulate != null) {
            int readInt = accumulate.readInt();
            Preconditions.checkState(readInt >= 0, "The length field of current message must be non-negative");
            Preconditions.checkState(accumulate.readInt() == -1159983106, "Network stream corrupted: received incorrect magic number.");
            byte readByte = accumulate.readByte();
            if (readByte == 0) {
                this.currentDecoder = this.bufferResponseDecoder;
            } else {
                this.currentDecoder = this.nonBufferResponseDecoder;
            }
            this.currentDecoder.onNewMessageReceived(readByte, readInt - 9);
        }
    }
}
