package org.apache.flink.streaming.api.windowing.triggers;

import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.class */
public class DeltaTrigger<T, W extends Window> extends Trigger<T, W> {
    private static final long serialVersionUID = 1;
    private final DeltaFunction<T> deltaFunction;
    private final double threshold;
    private final ValueStateDescriptor<T> stateDesc;

    private DeltaTrigger(double d, DeltaFunction<T> deltaFunction, TypeSerializer<T> typeSerializer) {
        this.deltaFunction = deltaFunction;
        this.threshold = d;
        this.stateDesc = new ValueStateDescriptor<>("last-element", typeSerializer);
    }

    @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger
    public TriggerResult onElement(T t, long j, W w, Trigger.TriggerContext triggerContext) throws Exception {
        ValueState valueState = (ValueState) triggerContext.getPartitionedState(this.stateDesc);
        if (valueState.value() == null) {
            valueState.update(t);
            return TriggerResult.CONTINUE;
        }
        if (this.deltaFunction.getDelta(valueState.value(), t) <= this.threshold) {
            return TriggerResult.CONTINUE;
        }
        valueState.update(t);
        return TriggerResult.FIRE;
    }

    @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger
    public TriggerResult onEventTime(long j, W w, Trigger.TriggerContext triggerContext) {
        return TriggerResult.CONTINUE;
    }

    @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger
    public TriggerResult onProcessingTime(long j, W w, Trigger.TriggerContext triggerContext) throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override // org.apache.flink.streaming.api.windowing.triggers.Trigger
    public void clear(W w, Trigger.TriggerContext triggerContext) throws Exception {
        ((ValueState) triggerContext.getPartitionedState(this.stateDesc)).clear();
    }

    public String toString() {
        return "DeltaTrigger(" + this.deltaFunction + ", " + this.threshold + DefaultExpressionEngine.DEFAULT_INDEX_END;
    }

    public static <T, W extends Window> DeltaTrigger<T, W> of(double d, DeltaFunction<T> deltaFunction, TypeSerializer<T> typeSerializer) {
        return new DeltaTrigger<>(d, deltaFunction, typeSerializer);
    }
}
