package org.apache.flink.runtime.io.network.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessage.class */
public abstract class NettyMessage {
    static final int HEADER_LENGTH = 9;
    static final int MAGIC_NUMBER = -1159983106;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessage$BufferResponse.class */
    public static class BufferResponse extends NettyMessage {
        private static final byte ID = 0;
        final Buffer buffer;
        InputChannelID receiverId;
        int sequenceNumber;
        boolean isBuffer;
        int size;
        ByteBuf retainedSlice;

        public BufferResponse() {
            this.buffer = null;
        }

        public BufferResponse(Buffer buffer, int i, InputChannelID inputChannelID) {
            this.buffer = buffer;
            this.sequenceNumber = i;
            this.receiverId = inputChannelID;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isBuffer() {
            return this.isBuffer;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int getSize() {
            return this.size;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ByteBuf getNettyBuffer() {
            return this.retainedSlice;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void releaseBuffer() {
            if (this.retainedSlice != null) {
                this.retainedSlice.release();
                this.retainedSlice = null;
            }
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        ByteBuf write(ByteBufAllocator byteBufAllocator) throws IOException {
            ByteBuf byteBuf = null;
            try {
                try {
                    byteBuf = NettyMessage.allocateBuffer(byteBufAllocator, (byte) 0, 25 + this.buffer.getSize());
                    this.receiverId.writeTo(byteBuf);
                    byteBuf.writeInt(this.sequenceNumber);
                    byteBuf.writeBoolean(this.buffer.isBuffer());
                    byteBuf.writeInt(this.buffer.getSize());
                    byteBuf.writeBytes(this.buffer.getNioBuffer());
                    if (this.buffer != null) {
                        this.buffer.recycle();
                    }
                    return byteBuf;
                } catch (Throwable th) {
                    if (byteBuf != null) {
                        byteBuf.release();
                    }
                    throw new IOException(th);
                }
            } catch (Throwable th2) {
                if (this.buffer != null) {
                    this.buffer.recycle();
                }
                throw th2;
            }
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        void readFrom(ByteBuf byteBuf) {
            this.receiverId = InputChannelID.fromByteBuf(byteBuf);
            this.sequenceNumber = byteBuf.readInt();
            this.isBuffer = byteBuf.readBoolean();
            this.size = byteBuf.readInt();
            this.retainedSlice = byteBuf.readSlice(this.size);
            this.retainedSlice.retain();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessage$CancelPartitionRequest.class */
    public static class CancelPartitionRequest extends NettyMessage {
        static final byte ID = 4;
        InputChannelID receiverId;

        public CancelPartitionRequest() {
        }

        public CancelPartitionRequest(InputChannelID inputChannelID) {
            this.receiverId = inputChannelID;
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        ByteBuf write(ByteBufAllocator byteBufAllocator) throws Exception {
            ByteBuf byteBuf = null;
            try {
                byteBuf = NettyMessage.allocateBuffer(byteBufAllocator, (byte) 4, 16);
                this.receiverId.writeTo(byteBuf);
                return byteBuf;
            } catch (Throwable th) {
                if (byteBuf != null) {
                    byteBuf.release();
                }
                throw new IOException(th);
            }
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        void readFrom(ByteBuf byteBuf) throws Exception {
            this.receiverId = InputChannelID.fromByteBuf(byteBuf);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessage$CloseRequest.class */
    public static class CloseRequest extends NettyMessage {
        private static final byte ID = 5;

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        ByteBuf write(ByteBufAllocator byteBufAllocator) throws Exception {
            return NettyMessage.allocateBuffer(byteBufAllocator, (byte) 5, 0);
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        void readFrom(ByteBuf byteBuf) throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessage$ErrorResponse.class */
    public static class ErrorResponse extends NettyMessage {
        private static final byte ID = 1;
        Throwable cause;
        InputChannelID receiverId;

        public ErrorResponse() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ErrorResponse(Throwable th) {
            this.cause = th;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public ErrorResponse(Throwable th, InputChannelID inputChannelID) {
            this.cause = th;
            this.receiverId = inputChannelID;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isFatalError() {
            return this.receiverId == null;
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        ByteBuf write(ByteBufAllocator byteBufAllocator) throws IOException {
            ByteBuf allocateBuffer = NettyMessage.allocateBuffer(byteBufAllocator, (byte) 1);
            try {
                ObjectOutputStream objectOutputStream = new ObjectOutputStream(new ByteBufOutputStream(allocateBuffer));
                Throwable th = null;
                try {
                    try {
                        objectOutputStream.writeObject(this.cause);
                        if (this.receiverId != null) {
                            allocateBuffer.writeBoolean(true);
                            this.receiverId.writeTo(allocateBuffer);
                        } else {
                            allocateBuffer.writeBoolean(false);
                        }
                        allocateBuffer.setInt(0, allocateBuffer.readableBytes());
                        if (objectOutputStream != null) {
                            if (0 != 0) {
                                try {
                                    objectOutputStream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                objectOutputStream.close();
                            }
                        }
                        return allocateBuffer;
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th3) {
                allocateBuffer.release();
                if (th3 instanceof IOException) {
                    throw ((IOException) th3);
                }
                throw new IOException(th3);
            }
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        void readFrom(ByteBuf byteBuf) throws Exception {
            ObjectInputStream objectInputStream = new ObjectInputStream(new ByteBufInputStream(byteBuf));
            Throwable th = null;
            try {
                Object readObject = objectInputStream.readObject();
                if (!(readObject instanceof Throwable)) {
                    throw new ClassCastException("Read object expected to be of type Throwable, actual type is " + readObject.getClass() + ".");
                }
                this.cause = (Throwable) readObject;
                if (byteBuf.readBoolean()) {
                    this.receiverId = InputChannelID.fromByteBuf(byteBuf);
                }
                if (objectInputStream != null) {
                    if (0 == 0) {
                        objectInputStream.close();
                        return;
                    }
                    try {
                        objectInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                if (objectInputStream != null) {
                    if (0 != 0) {
                        try {
                            objectInputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        objectInputStream.close();
                    }
                }
                throw th3;
            }
        }
    }

    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessage$NettyMessageDecoder.class */
    static class NettyMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
        /* renamed from: decode, reason: avoid collision after fix types in other method */
        protected void decode2(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            NettyMessage closeRequest;
            if (byteBuf.readInt() != NettyMessage.MAGIC_NUMBER) {
                throw new IllegalStateException("Network stream corrupted: received incorrect magic number.");
            }
            byte readByte = byteBuf.readByte();
            if (readByte == 0) {
                closeRequest = new BufferResponse();
            } else if (readByte == 2) {
                closeRequest = new PartitionRequest();
            } else if (readByte == 3) {
                closeRequest = new TaskEventRequest();
            } else if (readByte == 1) {
                closeRequest = new ErrorResponse();
            } else if (readByte == 4) {
                closeRequest = new CancelPartitionRequest();
            } else {
                if (readByte != 5) {
                    throw new IllegalStateException("Received unknown message from producer: " + byteBuf);
                }
                closeRequest = new CloseRequest();
            }
            if (closeRequest != null) {
                closeRequest.readFrom(byteBuf);
                list.add(closeRequest);
            }
        }

        @Override // io.netty.handler.codec.MessageToMessageDecoder
        protected /* bridge */ /* synthetic */ void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception {
            decode2(channelHandlerContext, byteBuf, (List<Object>) list);
        }
    }

    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessage$NettyMessageEncoder.class */
    static class NettyMessageEncoder extends ChannelOutboundHandlerAdapter {
        @Override // io.netty.channel.ChannelOutboundHandlerAdapter, io.netty.channel.ChannelOutboundHandler
        public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
            if (!(obj instanceof NettyMessage)) {
                channelHandlerContext.write(obj, channelPromise);
                return;
            }
            ByteBuf byteBuf = null;
            try {
                try {
                    byteBuf = ((NettyMessage) obj).write(channelHandlerContext.alloc());
                    if (byteBuf != null) {
                        channelHandlerContext.write(byteBuf, channelPromise);
                    }
                } catch (Throwable th) {
                    throw new IOException("Error while serializing message: " + obj, th);
                }
            } catch (Throwable th2) {
                if (byteBuf != null) {
                    channelHandlerContext.write(byteBuf, channelPromise);
                }
                throw th2;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static LengthFieldBasedFrameDecoder createFrameLengthDecoder() {
            return new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, -4, 4);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessage$PartitionRequest.class */
    public static class PartitionRequest extends NettyMessage {
        static final byte ID = 2;
        ResultPartitionID partitionId;
        int queueIndex;
        InputChannelID receiverId;

        public PartitionRequest() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public PartitionRequest(ResultPartitionID resultPartitionID, int i, InputChannelID inputChannelID) {
            this.partitionId = resultPartitionID;
            this.queueIndex = i;
            this.receiverId = inputChannelID;
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        ByteBuf write(ByteBufAllocator byteBufAllocator) throws IOException {
            ByteBuf byteBuf = null;
            try {
                byteBuf = NettyMessage.allocateBuffer(byteBufAllocator, (byte) 2, 52);
                this.partitionId.getPartitionId().writeTo(byteBuf);
                this.partitionId.getProducerId().writeTo(byteBuf);
                byteBuf.writeInt(this.queueIndex);
                this.receiverId.writeTo(byteBuf);
                return byteBuf;
            } catch (Throwable th) {
                if (byteBuf != null) {
                    byteBuf.release();
                }
                throw new IOException(th);
            }
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        public void readFrom(ByteBuf byteBuf) {
            this.partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(byteBuf), ExecutionAttemptID.fromByteBuf(byteBuf));
            this.queueIndex = byteBuf.readInt();
            this.receiverId = InputChannelID.fromByteBuf(byteBuf);
        }

        public String toString() {
            return String.format("PartitionRequest(%s:%d)", this.partitionId, Integer.valueOf(this.queueIndex));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyMessage$TaskEventRequest.class */
    public static class TaskEventRequest extends NettyMessage {
        static final byte ID = 3;
        TaskEvent event;
        InputChannelID receiverId;
        ResultPartitionID partitionId;

        public TaskEventRequest() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public TaskEventRequest(TaskEvent taskEvent, ResultPartitionID resultPartitionID, InputChannelID inputChannelID) {
            this.event = taskEvent;
            this.receiverId = inputChannelID;
            this.partitionId = resultPartitionID;
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        ByteBuf write(ByteBufAllocator byteBufAllocator) throws IOException {
            ByteBuf byteBuf = null;
            try {
                ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(this.event);
                byteBuf = NettyMessage.allocateBuffer(byteBufAllocator, (byte) 3, 4 + serializedEvent.remaining() + 16 + 16 + 16);
                byteBuf.writeInt(serializedEvent.remaining());
                byteBuf.writeBytes(serializedEvent);
                this.partitionId.getPartitionId().writeTo(byteBuf);
                this.partitionId.getProducerId().writeTo(byteBuf);
                this.receiverId.writeTo(byteBuf);
                return byteBuf;
            } catch (Throwable th) {
                if (byteBuf != null) {
                    byteBuf.release();
                }
                throw new IOException(th);
            }
        }

        @Override // org.apache.flink.runtime.io.network.netty.NettyMessage
        public void readFrom(ByteBuf byteBuf) throws IOException {
            ByteBuffer allocate = ByteBuffer.allocate(byteBuf.readInt());
            byteBuf.readBytes(allocate);
            allocate.flip();
            this.event = (TaskEvent) EventSerializer.fromSerializedEvent(allocate, getClass().getClassLoader());
            this.partitionId = new ResultPartitionID(IntermediateResultPartitionID.fromByteBuf(byteBuf), ExecutionAttemptID.fromByteBuf(byteBuf));
            this.receiverId = InputChannelID.fromByteBuf(byteBuf);
        }
    }

    NettyMessage() {
    }

    abstract ByteBuf write(ByteBufAllocator byteBufAllocator) throws Exception;

    abstract void readFrom(ByteBuf byteBuf) throws Exception;

    /* JADX INFO: Access modifiers changed from: private */
    public static ByteBuf allocateBuffer(ByteBufAllocator byteBufAllocator, byte b) {
        return allocateBuffer(byteBufAllocator, b, 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ByteBuf allocateBuffer(ByteBufAllocator byteBufAllocator, byte b, int i) {
        ByteBuf directBuffer = i != 0 ? byteBufAllocator.directBuffer(9 + i) : byteBufAllocator.directBuffer();
        directBuffer.writeInt(9 + i);
        directBuffer.writeInt(MAGIC_NUMBER);
        directBuffer.writeByte(b);
        return directBuffer;
    }
}
