package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.class */
public class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMessage> {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestServerHandler.class);
    private final ResultPartitionProvider partitionProvider;
    private final TaskEventPublisher taskEventPublisher;
    private final PartitionRequestQueue outboundQueue;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionRequestServerHandler(ResultPartitionProvider resultPartitionProvider, TaskEventPublisher taskEventPublisher, PartitionRequestQueue partitionRequestQueue) {
        this.partitionProvider = resultPartitionProvider;
        this.taskEventPublisher = taskEventPublisher;
        this.outboundQueue = partitionRequestQueue;
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelRegistered(channelHandlerContext);
    }

    @Override // org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler
    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelUnregistered(channelHandlerContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler
    public void channelRead0(ChannelHandlerContext channelHandlerContext, NettyMessage nettyMessage) throws Exception {
        try {
            Class<?> cls = nettyMessage.getClass();
            if (cls == NettyMessage.PartitionRequest.class) {
                NettyMessage.PartitionRequest partitionRequest = (NettyMessage.PartitionRequest) nettyMessage;
                LOG.debug("Read channel on {}: {}.", channelHandlerContext.channel().localAddress(), partitionRequest);
                try {
                    CreditBasedSequenceNumberingViewReader creditBasedSequenceNumberingViewReader = new CreditBasedSequenceNumberingViewReader(partitionRequest.receiverId, partitionRequest.credit, this.outboundQueue);
                    creditBasedSequenceNumberingViewReader.requestSubpartitionView(this.partitionProvider, partitionRequest.partitionId, partitionRequest.queueIndex);
                    this.outboundQueue.notifyReaderCreated(creditBasedSequenceNumberingViewReader);
                } catch (PartitionNotFoundException e) {
                    respondWithError(channelHandlerContext, e, partitionRequest.receiverId);
                }
            } else if (cls == NettyMessage.TaskEventRequest.class) {
                NettyMessage.TaskEventRequest taskEventRequest = (NettyMessage.TaskEventRequest) nettyMessage;
                if (!this.taskEventPublisher.publish(taskEventRequest.partitionId, taskEventRequest.event)) {
                    respondWithError(channelHandlerContext, new IllegalArgumentException("Task event receiver not found."), taskEventRequest.receiverId);
                }
            } else if (cls == NettyMessage.CancelPartitionRequest.class) {
                this.outboundQueue.cancel(((NettyMessage.CancelPartitionRequest) nettyMessage).receiverId);
            } else if (cls == NettyMessage.CloseRequest.class) {
                this.outboundQueue.close();
            } else if (cls == NettyMessage.AddCredit.class) {
                NettyMessage.AddCredit addCredit = (NettyMessage.AddCredit) nettyMessage;
                this.outboundQueue.addCreditOrResumeConsumption(addCredit.receiverId, networkSequenceViewReader -> {
                    networkSequenceViewReader.addCredit(addCredit.credit);
                });
            } else if (cls == NettyMessage.ResumeConsumption.class) {
                this.outboundQueue.addCreditOrResumeConsumption(((NettyMessage.ResumeConsumption) nettyMessage).receiverId, (v0) -> {
                    v0.resumeConsumption();
                });
            } else {
                LOG.warn("Received unexpected client request: {}", nettyMessage);
            }
        } catch (Throwable th) {
            respondWithError(channelHandlerContext, th);
        }
    }

    private void respondWithError(ChannelHandlerContext channelHandlerContext, Throwable th) {
        channelHandlerContext.writeAndFlush(new NettyMessage.ErrorResponse(th));
    }

    private void respondWithError(ChannelHandlerContext channelHandlerContext, Throwable th, InputChannelID inputChannelID) {
        LOG.debug("Responding with error: {}.", th.getClass());
        channelHandlerContext.writeAndFlush(new NettyMessage.ErrorResponse(th, inputChannelID));
    }
}
