package org.apache.flink.streaming.runtime.io.recovery;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/recovery/DemultiplexingRecordDeserializer.class */
class DemultiplexingRecordDeserializer<T> implements RecordDeserializer<DeserializationDelegate<StreamElement>> {
    public static final DemultiplexingRecordDeserializer UNMAPPED = new DemultiplexingRecordDeserializer(Collections.emptyMap());
    private final Map<SubtaskConnectionDescriptor, VirtualChannel<T>> channels;
    private VirtualChannel<T> currentVirtualChannel;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/recovery/DemultiplexingRecordDeserializer$VirtualChannel.class */
    public static class VirtualChannel<T> {
        private final RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer;
        private final Predicate<StreamRecord<T>> recordFilter;
        Watermark lastWatermark = Watermark.UNINITIALIZED;
        StreamStatus streamStatus = StreamStatus.ACTIVE;
        private RecordDeserializer.DeserializationResult lastResult;

        VirtualChannel(RecordDeserializer<DeserializationDelegate<StreamElement>> recordDeserializer, Predicate<StreamRecord<T>> predicate) {
            this.deserializer = recordDeserializer;
            this.recordFilter = predicate;
        }

        public RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate<StreamElement> deserializationDelegate) throws IOException {
            do {
                this.lastResult = this.deserializer.getNextRecord(deserializationDelegate);
                if (this.lastResult.isFullRecord()) {
                    StreamElement deserializationDelegate2 = deserializationDelegate.getInstance();
                    if (deserializationDelegate2.isRecord() && this.recordFilter.test(deserializationDelegate2.asRecord())) {
                        return this.lastResult;
                    }
                    if (deserializationDelegate2.isWatermark()) {
                        this.lastWatermark = deserializationDelegate2.asWatermark();
                        return this.lastResult;
                    }
                    if (deserializationDelegate2.isStreamStatus()) {
                        this.streamStatus = deserializationDelegate2.asStreamStatus();
                        return this.lastResult;
                    }
                }
            } while (!this.lastResult.isBufferConsumed());
            return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
        }

        public void setNextBuffer(Buffer buffer) throws IOException {
            this.deserializer.setNextBuffer(buffer);
        }

        public void clear() {
            this.deserializer.clear();
        }

        public boolean hasPartialData() {
            return (this.lastResult == null || this.lastResult.isBufferConsumed()) ? false : true;
        }
    }

    public DemultiplexingRecordDeserializer(Map<SubtaskConnectionDescriptor, VirtualChannel<T>> map) {
        this.channels = (Map) Preconditions.checkNotNull(map);
    }

    public void select(SubtaskConnectionDescriptor subtaskConnectionDescriptor) {
        this.currentVirtualChannel = this.channels.get(subtaskConnectionDescriptor);
        if (this.currentVirtualChannel == null) {
            throw new IllegalStateException("Cannot select " + subtaskConnectionDescriptor + "; known channels are " + this.channels.keySet());
        }
    }

    public boolean hasMappings() {
        return !this.channels.isEmpty();
    }

    @VisibleForTesting
    Collection<SubtaskConnectionDescriptor> getVirtualChannelSelectors() {
        return this.channels.keySet();
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public void setNextBuffer(Buffer buffer) throws IOException {
        this.currentVirtualChannel.setNextBuffer(buffer);
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public CloseableIterator<Buffer> getUnconsumedBuffer() throws IOException {
        throw new IllegalStateException("Cannot checkpoint while recovering");
    }

    public boolean hasPartialData() {
        return this.channels.values().stream().anyMatch((v0) -> {
            return v0.hasPartialData();
        });
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public RecordDeserializer.DeserializationResult getNextRecord(DeserializationDelegate<StreamElement> deserializationDelegate) throws IOException {
        RecordDeserializer.DeserializationResult nextRecord;
        do {
            nextRecord = this.currentVirtualChannel.getNextRecord(deserializationDelegate);
            if (nextRecord.isFullRecord()) {
                StreamElement deserializationDelegate2 = deserializationDelegate.getInstance();
                if (deserializationDelegate2.isRecord() || deserializationDelegate2.isLatencyMarker()) {
                    return nextRecord;
                }
                if (deserializationDelegate2.isWatermark()) {
                    Watermark watermark = (Watermark) this.channels.values().stream().map(virtualChannel -> {
                        return virtualChannel.lastWatermark;
                    }).min(Comparator.comparing((v0) -> {
                        return v0.getTimestamp();
                    })).orElseThrow(() -> {
                        return new IllegalStateException("Should always have a watermark");
                    });
                    if (!watermark.equals(Watermark.UNINITIALIZED)) {
                        deserializationDelegate.setInstance(watermark);
                        return nextRecord;
                    }
                } else if (deserializationDelegate2.isStreamStatus()) {
                    if (this.channels.values().stream().anyMatch(virtualChannel2 -> {
                        return virtualChannel2.streamStatus.isActive();
                    })) {
                        deserializationDelegate.setInstance(StreamStatus.ACTIVE);
                    }
                    return nextRecord;
                }
            }
        } while (!nextRecord.isBufferConsumed());
        return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
    }

    @Override // org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer
    public void clear() {
        this.channels.values().forEach(virtualChannel -> {
            virtualChannel.clear();
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> DemultiplexingRecordDeserializer<T> create(InputChannelInfo inputChannelInfo, InflightDataRescalingDescriptor inflightDataRescalingDescriptor, Function<Integer, RecordDeserializer<DeserializationDelegate<StreamElement>>> function, Function<InputChannelInfo, Predicate<StreamRecord<T>>> function2) {
        int[] oldSubtaskIndexes = inflightDataRescalingDescriptor.getOldSubtaskIndexes();
        if (oldSubtaskIndexes.length == 0) {
            return UNMAPPED;
        }
        int[] mappedIndexes = inflightDataRescalingDescriptor.getChannelMapping(inputChannelInfo.getGateIdx()).getMappedIndexes(inputChannelInfo.getInputChannelIdx());
        if (mappedIndexes.length == 0) {
            return UNMAPPED;
        }
        int length = oldSubtaskIndexes.length * mappedIndexes.length;
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(length);
        for (int i : oldSubtaskIndexes) {
            for (int i2 : mappedIndexes) {
                newHashMapWithExpectedSize.put(new SubtaskConnectionDescriptor(i, i2), new VirtualChannel(function.apply(Integer.valueOf(length)), inflightDataRescalingDescriptor.isAmbiguous(i) ? function2.apply(inputChannelInfo) : RecordFilter.all()));
            }
        }
        return new DemultiplexingRecordDeserializer<>(newHashMapWithExpectedSize);
    }

    public String toString() {
        return "DemultiplexingRecordDeserializer{channels=" + this.channels.keySet() + '}';
    }
}
