package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.KeyMap;
import org.apache.flink.util.Collector;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.class */
public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes<Type, Key, Type, Type> {
    private final KeySelector<Type, Key> keySelector;
    private final ReduceFunction<Type> reducer;
    private long evaluationPass = 1;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes$AggregatingTraversal.class */
    static final class AggregatingTraversal<Key, Type> implements KeyMap.TraversalEvaluator<Key, Type> {
        private final ReduceFunction<Type> function;
        private final Collector<Type> out;
        private final AbstractStreamOperator<Type> operator;
        private Type currentValue;

        AggregatingTraversal(ReduceFunction<Type> reduceFunction, Collector<Type> collector, AbstractStreamOperator<Type> abstractStreamOperator) {
            this.function = reduceFunction;
            this.out = collector;
            this.operator = abstractStreamOperator;
        }

        @Override // org.apache.flink.streaming.runtime.operators.windowing.KeyMap.TraversalEvaluator
        public void startNewKey(Key key) {
            this.currentValue = null;
            this.operator.setCurrentKey(key);
        }

        @Override // org.apache.flink.streaming.runtime.operators.windowing.KeyMap.TraversalEvaluator
        public void nextValue(Type type) throws Exception {
            if (this.currentValue != null) {
                this.currentValue = this.function.reduce(this.currentValue, type);
            } else {
                this.currentValue = type;
            }
        }

        @Override // org.apache.flink.streaming.runtime.operators.windowing.KeyMap.TraversalEvaluator
        public void keyDone() throws Exception {
            this.out.collect(this.currentValue);
        }
    }

    public AggregatingKeyedTimePanes(KeySelector<Type, Key> keySelector, ReduceFunction<Type> reduceFunction) {
        this.keySelector = keySelector;
        this.reducer = reduceFunction;
    }

    @Override // org.apache.flink.streaming.runtime.operators.windowing.AbstractKeyedTimePanes
    public void addElementToLatestPane(Type type) throws Exception {
        this.latestPane.putOrAggregate(this.keySelector.getKey(type), type, this.reducer);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.runtime.operators.windowing.AbstractKeyedTimePanes
    public void evaluateWindow(Collector<Type> collector, TimeWindow timeWindow, AbstractStreamOperator<Type> abstractStreamOperator) throws Exception {
        if (this.previousPanes.isEmpty()) {
            Iterator it = this.latestPane.iterator();
            while (it.hasNext()) {
                collector.collect(((KeyMap.Entry) it.next()).getValue());
            }
        } else {
            traverseAllPanes(new AggregatingTraversal(this.reducer, collector, abstractStreamOperator), this.evaluationPass);
        }
        this.evaluationPass++;
    }
}
