package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.datastream.LegacyWindowOperatorType;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.class */
public class WindowOperator<K, IN, ACC, OUT, W extends Window> extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
    private static final long serialVersionUID = 1;
    protected final WindowAssigner<? super IN, W> windowAssigner;
    protected final KeySelector<IN, K> keySelector;
    protected final Trigger<? super IN, ? super W> trigger;
    protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
    protected final ListStateDescriptor<Tuple2<W, W>> mergingWindowsDescriptor;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;
    protected final long allowedLateness;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient WindowOperator<K, IN, ACC, OUT, W>.Context context;
    protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    protected transient InternalTimerService<W> internalTimerService;
    private final LegacyWindowOperatorType legacyWindowOperatorType;
    private transient PriorityQueue<StreamRecord<IN>> restoredFromLegacyAlignedOpRecords;
    private transient PriorityQueue<Timer<K, W>> restoredFromLegacyProcessingTimeTimers;
    private transient PriorityQueue<Timer<K, W>> restoredFromLegacyEventTimeTimers;
    private static final int BEGIN_OF_STATE_MAGIC_NUMBER = 267505218;
    private static final int BEGIN_OF_PANE_MAGIC_NUMBER = -1159790379;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$Context.class */
    public class Context implements Trigger.OnMergeContext {
        protected K key;
        protected W window;
        protected Collection<W> mergedWindows;

        public Context(K k, W w) {
            this.key = k;
            this.window = w;
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public MetricGroup getMetricGroup() {
            return WindowOperator.this.getMetricGroup();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentWatermark() {
            return WindowOperator.this.internalTimerService.currentWatermark();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, Class<S> cls, S s) {
            Preconditions.checkNotNull(cls, "The state type class must not be null");
            try {
                return getKeyValueState(str, (TypeInformation<TypeInformation<S>>) TypeExtractor.getForClass(cls), (TypeInformation<S>) s);
            } catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + cls.getName() + "' from the class alone, due to generic type parameters. Please specify the TypeInformation directly.", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends Serializable> ValueState<S> getKeyValueState(String str, TypeInformation<S> typeInformation, S s) {
            Preconditions.checkNotNull(str, "The name of the state must not be null");
            Preconditions.checkNotNull(typeInformation, "The state type information must not be null");
            return (ValueState) getPartitionedState(new ValueStateDescriptor(str, typeInformation.createSerializer(WindowOperator.this.getExecutionConfig()), s));
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S) WindowOperator.this.getPartitionedState(this.window, WindowOperator.this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.OnMergeContext
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows == null || this.mergedWindows.size() <= 0) {
                return;
            }
            try {
                WindowOperator.this.getKeyedStateBackend().mergePartitionedStates(this.window, this.mergedWindows, WindowOperator.this.windowSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Error while merging state.", e);
            }
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public long getCurrentProcessingTime() {
            return WindowOperator.this.internalTimerService.currentProcessingTime();
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerProcessingTimeTimer(long j) {
            WindowOperator.this.internalTimerService.registerProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void registerEventTimeTimer(long j) {
            WindowOperator.this.internalTimerService.registerEventTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteProcessingTimeTimer(long j) {
            WindowOperator.this.internalTimerService.deleteProcessingTimeTimer(this.window, j);
        }

        @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
        public void deleteEventTimeTimer(long j) {
            WindowOperator.this.internalTimerService.deleteEventTimeTimer(this.window, j);
        }

        public TriggerResult onElement(StreamRecord<IN> streamRecord) throws Exception {
            return WindowOperator.this.trigger.onElement(streamRecord.getValue(), streamRecord.getTimestamp(), this.window, this);
        }

        public TriggerResult onProcessingTime(long j) throws Exception {
            return WindowOperator.this.trigger.onProcessingTime(j, this.window, this);
        }

        public TriggerResult onEventTime(long j) throws Exception {
            return WindowOperator.this.trigger.onEventTime(j, this.window, this);
        }

        public void onMerge(Collection<W> collection) throws Exception {
            this.mergedWindows = collection;
            WindowOperator.this.trigger.onMerge(this.window, this);
        }

        public void clear() throws Exception {
            WindowOperator.this.trigger.clear(this.window, this);
        }

        public String toString() {
            return "Context{key=" + this.key + ", window=" + this.window + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/WindowOperator$Timer.class */
    public static class Timer<K, W extends Window> implements Comparable<Timer<K, W>> {
        protected long timestamp;
        protected K key;
        protected W window;

        public Timer(long j, K k, W w) {
            this.timestamp = j;
            this.key = k;
            this.window = w;
        }

        @Override // java.lang.Comparable
        public int compareTo(Timer<K, W> timer) {
            return Long.compare(this.timestamp, timer.timestamp);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Timer timer = (Timer) obj;
            return this.timestamp == timer.timestamp && this.key.equals(timer.key) && this.window.equals(timer.window);
        }

        public int hashCode() {
            return (31 * ((31 * ((int) (this.timestamp ^ (this.timestamp >>> 32)))) + this.key.hashCode())) + this.window.hashCode();
        }

        public String toString() {
            return "Timer{timestamp=" + this.timestamp + ", key=" + this.key + ", window=" + this.window + '}';
        }
    }

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<? extends AppendingState<IN, ACC>, ?> stateDescriptor, InternalWindowFunction<ACC, OUT, K, W> internalWindowFunction, Trigger<? super IN, ? super W> trigger, long j) {
        this(windowAssigner, typeSerializer, keySelector, typeSerializer2, stateDescriptor, internalWindowFunction, trigger, j, LegacyWindowOperatorType.NONE);
    }

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> typeSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> typeSerializer2, StateDescriptor<? extends AppendingState<IN, ACC>, ?> stateDescriptor, InternalWindowFunction<ACC, OUT, K, W> internalWindowFunction, Trigger<? super IN, ? super W> trigger, long j, LegacyWindowOperatorType legacyWindowOperatorType) {
        super(internalWindowFunction);
        this.context = new Context(null, null);
        Preconditions.checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner), "The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator. This assigner is only used with the AccumulatingProcessingTimeWindowOperator and the AggregatingProcessingTimeWindowOperator");
        Preconditions.checkArgument(j >= 0);
        Preconditions.checkArgument(stateDescriptor == null || stateDescriptor.isSerializerInitialized(), "window state serializer is not properly initialized");
        this.windowAssigner = (WindowAssigner) Preconditions.checkNotNull(windowAssigner);
        this.windowSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
        this.keySelector = (KeySelector) Preconditions.checkNotNull(keySelector);
        this.keySerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer2);
        this.windowStateDescriptor = stateDescriptor;
        this.trigger = (Trigger) Preconditions.checkNotNull(trigger);
        this.allowedLateness = j;
        this.legacyWindowOperatorType = legacyWindowOperatorType;
        if (windowAssigner instanceof MergingWindowAssigner) {
            this.mergingWindowsDescriptor = new ListStateDescriptor<>("merging-window-set", new TupleSerializer(Tuple2.class, new TypeSerializer[]{typeSerializer, typeSerializer}));
        } else {
            this.mergingWindowsDescriptor = null;
        }
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector<>(this.output);
        this.internalTimerService = (InternalTimerService<W>) getInternalTimerService("window-timers", this.windowSerializer, this);
        this.context = new Context(null, null);
        this.windowAssignerContext = new WindowAssigner.WindowAssignerContext() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.1
            @Override // org.apache.flink.streaming.api.windowing.assigners.WindowAssigner.WindowAssignerContext
            public long getCurrentProcessingTime() {
                return WindowOperator.this.internalTimerService.currentProcessingTime();
            }
        };
        registerRestoredLegacyStateState();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        this.timestampedCollector = null;
        this.context = null;
        this.windowAssignerContext = null;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() throws Exception {
        super.dispose();
        this.timestampedCollector = null;
        this.context = null;
        this.windowAssignerContext = null;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        Collection<W> assignWindows = this.windowAssigner.assignWindows(streamRecord.getValue(), streamRecord.getTimestamp(), this.windowAssignerContext);
        final K k = (K) getKeyedStateBackend().getCurrentKey();
        if (!(this.windowAssigner instanceof MergingWindowAssigner)) {
            for (W w : assignWindows) {
                if (!isLate(w)) {
                    AppendingState appendingState = (AppendingState) getPartitionedState(w, this.windowSerializer, this.windowStateDescriptor);
                    appendingState.add(streamRecord.getValue());
                    this.context.key = k;
                    this.context.window = w;
                    TriggerResult onElement = this.context.onElement(streamRecord);
                    if (onElement.isFire()) {
                        Object obj = appendingState.get();
                        if (obj != null) {
                            emitWindowContents(w, obj);
                        }
                    }
                    if (onElement.isPurge()) {
                        appendingState.clear();
                    }
                    registerCleanupTimer(w);
                }
            }
            return;
        }
        MergingWindowSet mergingWindowSet = getMergingWindowSet();
        for (W w2 : assignWindows) {
            W w3 = (W) mergingWindowSet.addWindow(w2, new MergingWindowSet.MergeFunction<W>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.2
                public void merge(W w4, Collection<W> collection, W w5, Collection<W> collection2) throws Exception {
                    WindowOperator.this.context.key = (K) k;
                    WindowOperator.this.context.window = w4;
                    WindowOperator.this.context.onMerge(collection);
                    for (W w6 : collection) {
                        WindowOperator.this.context.window = w6;
                        WindowOperator.this.context.clear();
                        WindowOperator.this.deleteCleanupTimer(w6);
                    }
                    WindowOperator.this.getKeyedStateBackend().mergePartitionedStates(w5, collection2, WindowOperator.this.windowSerializer, WindowOperator.this.windowStateDescriptor);
                }

                /* JADX WARN: Multi-variable type inference failed */
                @Override // org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.MergeFunction
                public /* bridge */ /* synthetic */ void merge(Object obj2, Collection collection, Object obj3, Collection collection2) throws Exception {
                    merge((Collection) obj2, (Collection<Collection>) collection, (Collection) obj3, (Collection<Collection>) collection2);
                }
            });
            if (isLate(w3)) {
                mergingWindowSet.retireWindow(w3);
            } else {
                Window stateWindow = mergingWindowSet.getStateWindow(w3);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + w2 + " is not in in-flight window set.");
                }
                AppendingState appendingState2 = (AppendingState) getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                appendingState2.add(streamRecord.getValue());
                this.context.key = k;
                this.context.window = w3;
                TriggerResult onElement2 = this.context.onElement(streamRecord);
                if (onElement2.isFire()) {
                    Object obj2 = appendingState2.get();
                    if (obj2 != null) {
                        emitWindowContents(w3, obj2);
                    }
                }
                if (onElement2.isPurge()) {
                    appendingState2.clear();
                }
                registerCleanupTimer(w3);
            }
        }
        mergingWindowSet.persist();
    }

    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        AppendingState<IN, ACC> appendingState;
        this.context.key = internalTimer.getKey();
        this.context.window = internalTimer.getNamespace();
        MergingWindowSet<W> mergingWindowSet = null;
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            W stateWindow = mergingWindowSet.getStateWindow(this.context.window);
            appendingState = stateWindow == null ? null : (AppendingState) getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
        } else {
            appendingState = (AppendingState) getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
        }
        ACC acc = null;
        if (appendingState != null) {
            acc = appendingState.get();
        }
        if (acc != null) {
            TriggerResult onEventTime = this.context.onEventTime(internalTimer.getTimestamp());
            if (onEventTime.isFire()) {
                emitWindowContents(this.context.window, acc);
            }
            if (onEventTime.isPurge()) {
                appendingState.clear();
            }
        }
        if (this.windowAssigner.isEventTime() && isCleanupTime(this.context.window, internalTimer.getTimestamp())) {
            clearAllState(this.context.window, appendingState, mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        AppendingState<IN, ACC> appendingState;
        this.context.key = internalTimer.getKey();
        this.context.window = internalTimer.getNamespace();
        MergingWindowSet<W> mergingWindowSet = null;
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            mergingWindowSet = getMergingWindowSet();
            W stateWindow = mergingWindowSet.getStateWindow(this.context.window);
            appendingState = stateWindow == null ? null : (AppendingState) getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
        } else {
            appendingState = (AppendingState) getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
        }
        ACC acc = null;
        if (appendingState != null) {
            acc = appendingState.get();
        }
        if (acc != null) {
            TriggerResult onProcessingTime = this.context.onProcessingTime(internalTimer.getTimestamp());
            if (onProcessingTime.isFire()) {
                emitWindowContents(this.context.window, acc);
            }
            if (onProcessingTime.isPurge()) {
                appendingState.clear();
            }
        }
        if (!this.windowAssigner.isEventTime() && isCleanupTime(this.context.window, internalTimer.getTimestamp())) {
            clearAllState(this.context.window, appendingState, mergingWindowSet);
        }
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    private void clearAllState(W w, AppendingState<IN, ACC> appendingState, MergingWindowSet<W> mergingWindowSet) throws Exception {
        appendingState.clear();
        this.context.clear();
        if (mergingWindowSet != null) {
            mergingWindowSet.retireWindow(w);
            mergingWindowSet.persist();
        }
    }

    private void emitWindowContents(W w, ACC acc) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(w.maxTimestamp());
        ((InternalWindowFunction) this.userFunction).apply(this.context.key, this.context.window, acc, this.timestampedCollector);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MergingWindowSet<W> getMergingWindowSet() throws Exception {
        return new MergingWindowSet<>((MergingWindowAssigner) this.windowAssigner, (ListState) getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, this.mergingWindowsDescriptor));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isLate(W w) {
        return this.windowAssigner.isEventTime() && cleanupTime(w) <= this.internalTimerService.currentWatermark();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerCleanupTimer(W w) {
        long cleanupTime = cleanupTime(w);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.context.registerEventTimeTimer(cleanupTime);
        } else {
            this.context.registerProcessingTimeTimer(cleanupTime);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deleteCleanupTimer(W w) {
        long cleanupTime = cleanupTime(w);
        if (cleanupTime == Long.MAX_VALUE) {
            return;
        }
        if (this.windowAssigner.isEventTime()) {
            this.context.deleteEventTimeTimer(cleanupTime);
        } else {
            this.context.deleteProcessingTimeTimer(cleanupTime);
        }
    }

    private long cleanupTime(W w) {
        if (!this.windowAssigner.isEventTime()) {
            return w.maxTimestamp();
        }
        long maxTimestamp = w.maxTimestamp() + this.allowedLateness;
        if (maxTimestamp >= w.maxTimestamp()) {
            return maxTimestamp;
        }
        return Long.MAX_VALUE;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final boolean isCleanupTime(W w, long j) {
        return j == cleanupTime(w);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator, org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator
    public void restoreState(FSDataInputStream fSDataInputStream) throws Exception {
        super.restoreState(fSDataInputStream);
        LOG.info("{} (taskIdx={}) restoring {} state from an older Flink version.", new Object[]{getClass().getSimpleName(), this.legacyWindowOperatorType, Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask())});
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
        switch (this.legacyWindowOperatorType) {
            case NONE:
                restoreFromLegacyWindowOperator(dataInputViewStreamWrapper);
                return;
            case FAST_ACCUMULATING:
            case FAST_AGGREGATING:
                restoreFromLegacyAlignedWindowOperator(dataInputViewStreamWrapper);
                return;
            default:
                return;
        }
    }

    public void registerRestoredLegacyStateState() throws Exception {
        switch (this.legacyWindowOperatorType) {
            case NONE:
                reregisterStateFromLegacyWindowOperator();
                return;
            case FAST_ACCUMULATING:
            case FAST_AGGREGATING:
                reregisterStateFromLegacyAlignedWindowOperator();
                return;
            default:
                return;
        }
    }

    private void restoreFromLegacyAlignedWindowOperator(DataInputViewStreamWrapper dataInputViewStreamWrapper) throws IOException {
        Preconditions.checkArgument(this.legacyWindowOperatorType != LegacyWindowOperatorType.NONE);
        dataInputViewStreamWrapper.readLong();
        long readLong = dataInputViewStreamWrapper.readLong();
        validateMagicNumber(BEGIN_OF_STATE_MAGIC_NUMBER, dataInputViewStreamWrapper.readInt());
        this.restoredFromLegacyAlignedOpRecords = new PriorityQueue<>(42, new Comparator<StreamRecord<IN>>() { // from class: org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.3
            @Override // java.util.Comparator
            public int compare(StreamRecord<IN> streamRecord, StreamRecord<IN> streamRecord2) {
                return Long.compare(streamRecord.getTimestamp(), streamRecord2.getTimestamp());
            }
        });
        switch (this.legacyWindowOperatorType) {
            case FAST_ACCUMULATING:
                restoreElementsFromLegacyAccumulatingAlignedWindowOperator(dataInputViewStreamWrapper, readLong);
                break;
            case FAST_AGGREGATING:
                restoreElementsFromLegacyAggregatingAlignedWindowOperator(dataInputViewStreamWrapper, readLong);
                break;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} (taskIdx={}) restored {} events from legacy {}.", new Object[]{getClass().getSimpleName(), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(this.restoredFromLegacyAlignedOpRecords.size()), this.legacyWindowOperatorType});
        }
    }

    private void restoreElementsFromLegacyAccumulatingAlignedWindowOperator(DataInputView dataInputView, long j) throws IOException {
        int readInt = dataInputView.readInt();
        long paneSize = getPaneSize();
        long j2 = j - (readInt * paneSize);
        ArrayListSerializer arrayListSerializer = new ArrayListSerializer(getStateDescriptor().getSerializer());
        while (readInt > 0) {
            validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, dataInputView.readInt());
            j2 += paneSize - 1;
            for (int readInt2 = dataInputView.readInt() - 1; readInt2 >= 0; readInt2--) {
                this.keySerializer.deserialize(dataInputView);
                Iterator it = arrayListSerializer.deserialize(dataInputView).iterator();
                while (it.hasNext()) {
                    this.restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(it.next(), j2));
                }
            }
            readInt--;
        }
    }

    private void restoreElementsFromLegacyAggregatingAlignedWindowOperator(DataInputView dataInputView, long j) throws IOException {
        int readInt = dataInputView.readInt();
        long paneSize = getPaneSize();
        long j2 = j - (readInt * paneSize);
        while (readInt > 0) {
            validateMagicNumber(BEGIN_OF_PANE_MAGIC_NUMBER, dataInputView.readInt());
            j2 += paneSize - 1;
            for (int readInt2 = dataInputView.readInt() - 1; readInt2 >= 0; readInt2--) {
                this.keySerializer.deserialize(dataInputView);
                this.restoredFromLegacyAlignedOpRecords.add(new StreamRecord<>(getStateDescriptor().getSerializer().deserialize(dataInputView), j2));
            }
            readInt--;
        }
    }

    private long getPaneSize() {
        long size;
        Preconditions.checkArgument(this.legacyWindowOperatorType == LegacyWindowOperatorType.FAST_ACCUMULATING || this.legacyWindowOperatorType == LegacyWindowOperatorType.FAST_AGGREGATING);
        if (this.windowAssigner instanceof SlidingProcessingTimeWindows) {
            SlidingProcessingTimeWindows slidingProcessingTimeWindows = (SlidingProcessingTimeWindows) this.windowAssigner;
            size = ArithmeticUtils.gcd(slidingProcessingTimeWindows.getSize(), slidingProcessingTimeWindows.getSlide());
        } else {
            size = ((TumblingProcessingTimeWindows) this.windowAssigner).getSize();
        }
        return size;
    }

    private static void validateMagicNumber(int i, int i2) throws IOException {
        if (i != i2) {
            throw new IOException("Corrupt state stream - wrong magic number. Expected '" + Integer.toHexString(i) + "', found '" + Integer.toHexString(i2) + '\'');
        }
    }

    private void restoreFromLegacyWindowOperator(DataInputViewStreamWrapper dataInputViewStreamWrapper) throws IOException {
        Preconditions.checkArgument(this.legacyWindowOperatorType == LegacyWindowOperatorType.NONE);
        int readInt = dataInputViewStreamWrapper.readInt();
        this.restoredFromLegacyEventTimeTimers = new PriorityQueue<>(Math.max(readInt, 1));
        for (int i = 0; i < readInt; i++) {
            this.restoredFromLegacyEventTimeTimers.add(new Timer<>(dataInputViewStreamWrapper.readLong(), this.keySerializer.deserialize(dataInputViewStreamWrapper), this.windowSerializer.deserialize(dataInputViewStreamWrapper)));
        }
        int readInt2 = dataInputViewStreamWrapper.readInt();
        this.restoredFromLegacyProcessingTimeTimers = new PriorityQueue<>(Math.max(readInt2, 1));
        for (int i2 = 0; i2 < readInt2; i2++) {
            this.restoredFromLegacyProcessingTimeTimers.add(new Timer<>(dataInputViewStreamWrapper.readLong(), this.keySerializer.deserialize(dataInputViewStreamWrapper), this.windowSerializer.deserialize(dataInputViewStreamWrapper)));
        }
        int readInt3 = dataInputViewStreamWrapper.readInt();
        for (int i3 = 0; i3 < readInt3; i3++) {
            dataInputViewStreamWrapper.readLong();
            dataInputViewStreamWrapper.readInt();
        }
        if (LOG.isDebugEnabled()) {
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            if (this.restoredFromLegacyEventTimeTimers != null && !this.restoredFromLegacyEventTimeTimers.isEmpty()) {
                LOG.debug("{} (taskIdx={}) restored {} event time timers from an older Flink version: {}", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), Integer.valueOf(this.restoredFromLegacyEventTimeTimers.size()), this.restoredFromLegacyEventTimeTimers});
            }
            if (this.restoredFromLegacyProcessingTimeTimers == null || this.restoredFromLegacyProcessingTimeTimers.isEmpty()) {
                return;
            }
            LOG.debug("{} (taskIdx={}) restored {} processing time timers from an older Flink version: {}", new Object[]{getClass().getSimpleName(), Integer.valueOf(indexOfThisSubtask), Integer.valueOf(this.restoredFromLegacyProcessingTimeTimers.size()), this.restoredFromLegacyProcessingTimeTimers});
        }
    }

    public void reregisterStateFromLegacyWindowOperator() {
        if (this.restoredFromLegacyEventTimeTimers != null && !this.restoredFromLegacyEventTimeTimers.isEmpty()) {
            LOG.info("{} (taskIdx={}) re-registering event-time timers from an older Flink version.", getClass().getSimpleName(), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            Iterator<Timer<K, W>> it = this.restoredFromLegacyEventTimeTimers.iterator();
            while (it.hasNext()) {
                Timer<K, W> next = it.next();
                setCurrentKey(next.key);
                this.internalTimerService.registerEventTimeTimer(next.window, next.timestamp);
            }
        }
        if (this.restoredFromLegacyProcessingTimeTimers != null && !this.restoredFromLegacyProcessingTimeTimers.isEmpty()) {
            LOG.info("{} (taskIdx={}) re-registering processing-time timers from an older Flink version.", getClass().getSimpleName(), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
            Iterator<Timer<K, W>> it2 = this.restoredFromLegacyProcessingTimeTimers.iterator();
            while (it2.hasNext()) {
                Timer<K, W> next2 = it2.next();
                setCurrentKey(next2.key);
                this.internalTimerService.registerProcessingTimeTimer(next2.window, next2.timestamp);
            }
        }
        this.restoredFromLegacyEventTimeTimers = null;
        this.restoredFromLegacyProcessingTimeTimers = null;
    }

    public void reregisterStateFromLegacyAlignedWindowOperator() throws Exception {
        if (this.restoredFromLegacyAlignedOpRecords != null && !this.restoredFromLegacyAlignedOpRecords.isEmpty()) {
            LOG.info("{} (taskIdx={}) re-registering timers from legacy {} from an older Flink version.", new Object[]{getClass().getSimpleName(), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.legacyWindowOperatorType});
            while (!this.restoredFromLegacyAlignedOpRecords.isEmpty()) {
                StreamRecord<IN> poll = this.restoredFromLegacyAlignedOpRecords.poll();
                setCurrentKey(this.keySelector.getKey(poll.getValue()));
                processElement(poll);
            }
        }
        this.restoredFromLegacyAlignedOpRecords = null;
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, ACC>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }
}
