package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.class */
public class SingleInputGate implements InputGate {
    private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);
    private final String owningTaskName;
    private final JobID jobId;
    private final IntermediateDataSetID consumedResultId;
    private final ResultPartitionType consumedPartitionType;
    private final int consumedSubpartitionIndex;
    private final int numberOfInputChannels;
    private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
    private final BitSet channelsWithEndOfPartitionEvents;
    private final TaskActions taskActions;
    private BufferPool bufferPool;
    private boolean hasReceivedAllEndOfPartitionEvents;
    private boolean requestedPartitionsFlag;
    private volatile boolean isReleased;
    private volatile InputGateListener inputGateListener;
    private int numberOfUninitializedChannels;
    private Timer retriggerLocalRequestTimer;
    private final Object requestLock = new Object();
    private final ArrayDeque<InputChannel> inputChannelsWithData = new ArrayDeque<>();
    private final List<TaskEvent> pendingEvents = new ArrayList();

    public SingleInputGate(String str, JobID jobID, IntermediateDataSetID intermediateDataSetID, ResultPartitionType resultPartitionType, int i, int i2, TaskActions taskActions, TaskIOMetricGroup taskIOMetricGroup) {
        this.owningTaskName = (String) Preconditions.checkNotNull(str);
        this.jobId = (JobID) Preconditions.checkNotNull(jobID);
        this.consumedResultId = (IntermediateDataSetID) Preconditions.checkNotNull(intermediateDataSetID);
        this.consumedPartitionType = (ResultPartitionType) Preconditions.checkNotNull(resultPartitionType);
        Preconditions.checkArgument(i >= 0);
        this.consumedSubpartitionIndex = i;
        Preconditions.checkArgument(i2 > 0);
        this.numberOfInputChannels = i2;
        this.inputChannels = new HashMap(i2);
        this.channelsWithEndOfPartitionEvents = new BitSet(i2);
        this.taskActions = (TaskActions) Preconditions.checkNotNull(taskActions);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public int getNumberOfInputChannels() {
        return this.numberOfInputChannels;
    }

    public IntermediateDataSetID getConsumedResultId() {
        return this.consumedResultId;
    }

    public ResultPartitionType getConsumedPartitionType() {
        return this.consumedPartitionType;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferProvider getBufferProvider() {
        return this.bufferPool;
    }

    public BufferPool getBufferPool() {
        return this.bufferPool;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public int getPageSize() {
        if (this.bufferPool != null) {
            return this.bufferPool.getMemorySegmentSize();
        }
        throw new IllegalStateException("Input gate has not been initialized with buffers.");
    }

    public int getNumberOfQueuedBuffers() {
        for (int i = 0; i < 3; i++) {
            try {
                int i2 = 0;
                for (InputChannel inputChannel : this.inputChannels.values()) {
                    if (inputChannel instanceof RemoteInputChannel) {
                        i2 += ((RemoteInputChannel) inputChannel).getNumberOfQueuedBuffers();
                    }
                }
                return i2;
            } catch (Exception e) {
            }
        }
        return 0;
    }

    public void setBufferPool(BufferPool bufferPool) {
        Preconditions.checkArgument(this.numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), "Bug in input gate setup logic: buffer pool has not enough guaranteed buffers for this input gate. Input gates require at least as many buffers as there are input channels.");
        Preconditions.checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool hasalready been set for this input gate.");
        this.bufferPool = (BufferPool) Preconditions.checkNotNull(bufferPool);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void setInputChannel(IntermediateResultPartitionID intermediateResultPartitionID, InputChannel inputChannel) {
        synchronized (this.requestLock) {
            if (this.inputChannels.put(Preconditions.checkNotNull(intermediateResultPartitionID), Preconditions.checkNotNull(inputChannel)) == null && inputChannel.getClass() == UnknownInputChannel.class) {
                this.numberOfUninitializedChannels++;
            }
        }
    }

    public void updateInputChannel(InputChannelDeploymentDescriptor inputChannelDeploymentDescriptor) throws IOException, InterruptedException {
        LocalInputChannel remoteInputChannel;
        synchronized (this.requestLock) {
            if (this.isReleased) {
                return;
            }
            IntermediateResultPartitionID partitionId = inputChannelDeploymentDescriptor.getConsumedPartitionId().getPartitionId();
            InputChannel inputChannel = this.inputChannels.get(partitionId);
            if (inputChannel.getClass() == UnknownInputChannel.class) {
                UnknownInputChannel unknownInputChannel = (UnknownInputChannel) inputChannel;
                ResultPartitionLocation consumedPartitionLocation = inputChannelDeploymentDescriptor.getConsumedPartitionLocation();
                if (consumedPartitionLocation.isLocal()) {
                    remoteInputChannel = unknownInputChannel.toLocalInputChannel();
                } else {
                    if (!consumedPartitionLocation.isRemote()) {
                        throw new IllegalStateException("Tried to update unknown channel with unknown channel.");
                    }
                    remoteInputChannel = unknownInputChannel.toRemoteInputChannel(consumedPartitionLocation.getConnectionId());
                }
                LOG.debug("Updated unknown input channel to {}.", remoteInputChannel);
                this.inputChannels.put(partitionId, remoteInputChannel);
                if (this.requestedPartitionsFlag) {
                    remoteInputChannel.requestSubpartition(this.consumedSubpartitionIndex);
                }
                Iterator<TaskEvent> it = this.pendingEvents.iterator();
                while (it.hasNext()) {
                    remoteInputChannel.sendTaskEvent(it.next());
                }
                int i = this.numberOfUninitializedChannels - 1;
                this.numberOfUninitializedChannels = i;
                if (i == 0) {
                    this.pendingEvents.clear();
                }
            }
        }
    }

    public void retriggerPartitionRequest(IntermediateResultPartitionID intermediateResultPartitionID) throws IOException, InterruptedException {
        synchronized (this.requestLock) {
            if (!this.isReleased) {
                InputChannel inputChannel = this.inputChannels.get(intermediateResultPartitionID);
                Preconditions.checkNotNull(inputChannel, "Unknown input channel with ID " + intermediateResultPartitionID);
                LOG.debug("Retriggering partition request {}:{}.", inputChannel.partitionId, Integer.valueOf(this.consumedSubpartitionIndex));
                if (inputChannel.getClass() == RemoteInputChannel.class) {
                    ((RemoteInputChannel) inputChannel).retriggerSubpartitionRequest(this.consumedSubpartitionIndex);
                } else {
                    if (inputChannel.getClass() != LocalInputChannel.class) {
                        throw new IllegalStateException("Unexpected type of channel to retrigger partition: " + inputChannel.getClass());
                    }
                    LocalInputChannel localInputChannel = (LocalInputChannel) inputChannel;
                    if (this.retriggerLocalRequestTimer == null) {
                        this.retriggerLocalRequestTimer = new Timer(true);
                    }
                    localInputChannel.retriggerSubpartitionRequest(this.retriggerLocalRequestTimer, this.consumedSubpartitionIndex);
                }
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    public void releaseAllResources() throws IOException {
        boolean z = false;
        synchronized (this.requestLock) {
            if (!this.isReleased) {
                try {
                    LOG.debug("{}: Releasing {}.", this.owningTaskName, this);
                    if (this.retriggerLocalRequestTimer != null) {
                        this.retriggerLocalRequestTimer.cancel();
                    }
                    Iterator<InputChannel> it = this.inputChannels.values().iterator();
                    while (it.hasNext()) {
                        try {
                            it.next().releaseAllResources();
                        } catch (IOException e) {
                            LOG.warn("Error during release of channel resources: " + e.getMessage(), e);
                        }
                    }
                    if (this.bufferPool != null) {
                        this.bufferPool.lazyDestroy();
                    }
                    this.isReleased = true;
                    z = true;
                } catch (Throwable th) {
                    this.isReleased = true;
                    throw th;
                }
            }
        }
        if (z) {
            synchronized (this.inputChannelsWithData) {
                this.inputChannelsWithData.notifyAll();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public boolean isFinished() {
        synchronized (this.requestLock) {
            Iterator<InputChannel> it = this.inputChannels.values().iterator();
            while (it.hasNext()) {
                if (!it.next().isReleased()) {
                    return false;
                }
            }
            return true;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void requestPartitions() throws IOException, InterruptedException {
        synchronized (this.requestLock) {
            if (!this.requestedPartitionsFlag) {
                if (this.isReleased) {
                    throw new IllegalStateException("Already released.");
                }
                if (this.numberOfInputChannels != this.inputChannels.size()) {
                    throw new IllegalStateException("Bug in input gate setup logic: mismatch betweennumber of total input channels and the currently set number of input channels.");
                }
                Iterator<InputChannel> it = this.inputChannels.values().iterator();
                while (it.hasNext()) {
                    it.next().requestSubpartition(this.consumedSubpartitionIndex);
                }
            }
            this.requestedPartitionsFlag = true;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
        InputChannel remove;
        boolean z;
        if (this.hasReceivedAllEndOfPartitionEvents) {
            return null;
        }
        if (this.isReleased) {
            throw new IllegalStateException("Released");
        }
        requestPartitions();
        synchronized (this.inputChannelsWithData) {
            while (this.inputChannelsWithData.size() == 0) {
                if (this.isReleased) {
                    throw new IllegalStateException("Released");
                }
                this.inputChannelsWithData.wait();
            }
            remove = this.inputChannelsWithData.remove();
            z = this.inputChannelsWithData.size() > 0;
        }
        InputChannel.BufferAndAvailability nextBuffer = remove.getNextBuffer();
        if (nextBuffer == null) {
            throw new IllegalStateException("Bug in input gate/channel logic: input gate got notified by channel about available data, but none was available.");
        }
        if (nextBuffer.moreAvailable()) {
            queueChannel(remove);
        }
        Buffer buffer = nextBuffer.buffer();
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, remove.getChannelIndex(), z);
        }
        AbstractEvent fromBuffer = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
        if (fromBuffer.getClass() == EndOfPartitionEvent.class) {
            this.channelsWithEndOfPartitionEvents.set(remove.getChannelIndex());
            if (this.channelsWithEndOfPartitionEvents.cardinality() == this.numberOfInputChannels) {
                this.hasReceivedAllEndOfPartitionEvents = true;
            }
            remove.notifySubpartitionConsumed();
            remove.releaseAllResources();
        }
        return new BufferOrEvent(fromBuffer, remove.getChannelIndex(), z);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        synchronized (this.requestLock) {
            Iterator<InputChannel> it = this.inputChannels.values().iterator();
            while (it.hasNext()) {
                it.next().sendTaskEvent(taskEvent);
            }
            if (this.numberOfUninitializedChannels > 0) {
                this.pendingEvents.add(taskEvent);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void registerListener(InputGateListener inputGateListener) {
        if (this.inputGateListener != null) {
            throw new IllegalStateException("Multiple listeners");
        }
        this.inputGateListener = inputGateListener;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyChannelNonEmpty(InputChannel inputChannel) {
        queueChannel((InputChannel) Preconditions.checkNotNull(inputChannel));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void triggerPartitionStateCheck(ResultPartitionID resultPartitionID) {
        this.taskActions.triggerPartitionProducerStateCheck(this.jobId, this.consumedResultId, resultPartitionID);
    }

    private void queueChannel(InputChannel inputChannel) {
        int size;
        InputGateListener inputGateListener;
        synchronized (this.inputChannelsWithData) {
            size = this.inputChannelsWithData.size();
            this.inputChannelsWithData.add(inputChannel);
            if (size == 0) {
                this.inputChannelsWithData.notifyAll();
            }
        }
        if (size != 0 || (inputGateListener = this.inputGateListener) == null) {
            return;
        }
        inputGateListener.notifyInputGateNonEmpty(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
        return this.inputChannels;
    }

    public static SingleInputGate create(String str, JobID jobID, ExecutionAttemptID executionAttemptID, InputGateDeploymentDescriptor inputGateDeploymentDescriptor, NetworkEnvironment networkEnvironment, TaskActions taskActions, TaskIOMetricGroup taskIOMetricGroup) {
        IntermediateDataSetID intermediateDataSetID = (IntermediateDataSetID) Preconditions.checkNotNull(inputGateDeploymentDescriptor.getConsumedResultId());
        ResultPartitionType resultPartitionType = (ResultPartitionType) Preconditions.checkNotNull(inputGateDeploymentDescriptor.getConsumedPartitionType());
        int consumedSubpartitionIndex = inputGateDeploymentDescriptor.getConsumedSubpartitionIndex();
        Preconditions.checkArgument(consumedSubpartitionIndex >= 0);
        InputChannelDeploymentDescriptor[] inputChannelDeploymentDescriptorArr = (InputChannelDeploymentDescriptor[]) Preconditions.checkNotNull(inputGateDeploymentDescriptor.getInputChannelDeploymentDescriptors());
        SingleInputGate singleInputGate = new SingleInputGate(str, jobID, intermediateDataSetID, resultPartitionType, consumedSubpartitionIndex, inputChannelDeploymentDescriptorArr.length, taskActions, taskIOMetricGroup);
        InputChannel[] inputChannelArr = new InputChannel[inputChannelDeploymentDescriptorArr.length];
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        for (int i4 = 0; i4 < inputChannelArr.length; i4++) {
            ResultPartitionID consumedPartitionId = inputChannelDeploymentDescriptorArr[i4].getConsumedPartitionId();
            ResultPartitionLocation consumedPartitionLocation = inputChannelDeploymentDescriptorArr[i4].getConsumedPartitionLocation();
            if (consumedPartitionLocation.isLocal()) {
                inputChannelArr[i4] = new LocalInputChannel(singleInputGate, i4, consumedPartitionId, networkEnvironment.getResultPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getPartitionRequestInitialBackoff(), networkEnvironment.getPartitionRequestMaxBackoff(), taskIOMetricGroup);
                i++;
            } else if (consumedPartitionLocation.isRemote()) {
                inputChannelArr[i4] = new RemoteInputChannel(singleInputGate, i4, consumedPartitionId, consumedPartitionLocation.getConnectionId(), networkEnvironment.getConnectionManager(), networkEnvironment.getPartitionRequestInitialBackoff(), networkEnvironment.getPartitionRequestMaxBackoff(), taskIOMetricGroup);
                i2++;
            } else {
                if (!consumedPartitionLocation.isUnknown()) {
                    throw new IllegalStateException("Unexpected partition location.");
                }
                inputChannelArr[i4] = new UnknownInputChannel(singleInputGate, i4, consumedPartitionId, networkEnvironment.getResultPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager(), networkEnvironment.getPartitionRequestInitialBackoff(), networkEnvironment.getPartitionRequestMaxBackoff(), taskIOMetricGroup);
                i3++;
            }
            singleInputGate.setInputChannel(consumedPartitionId.getPartitionId(), inputChannelArr[i4]);
        }
        LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).", new Object[]{Integer.valueOf(inputChannelArr.length), Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3)});
        return singleInputGate;
    }
}
