Skip to main content

Command Palette

Search for a command to run...

Flink 自定义窗口触发器

Updated
2 min read
Flink 自定义窗口触发器

自定义数量和超时时间的 CountWithTimeTrigger

package vehicle.excavate.operator;


import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;

public class CountWithTimeTrigger<T, W extends Window> extends Trigger<T, W> {
    private final long maxCount;
    private final long timeoutMs;
    private final MapStateDescriptor<String, Long> mapStateDesc = new MapStateDescriptor<>("mapState", String.class, Long.class);
    private final String countStr = "count";
    private final String timeStr = "time";

    private CountWithTimeTrigger(long maxCount, long timeoutMs) {
        this.maxCount = maxCount;
        this.timeoutMs = timeoutMs;
    }

    @Override
    // The onElement() method is called for each element that is added to a window.
    public TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx) throws Exception {
        MapState<String, Long> mapState = ctx.getPartitionedState(mapStateDesc);
        // set the default value if first call
        if (!mapState.contains(countStr)) {
            mapState.put(countStr, 0L);
        }
        if (!mapState.contains(timeStr)) {
            mapState.put(timeStr, Long.MAX_VALUE);
        }
        final long count = mapState.get(countStr);
        final long deadline = mapState.get(timeStr);

        final long currentTimeMs = System.currentTimeMillis();
        final long newCount = count + 1;

        if (currentTimeMs >= deadline || newCount >= maxCount) {
            return fire(mapState);
        }

        if (deadline == Long.MAX_VALUE) {
            final long nextDeadline = currentTimeMs + timeoutMs;
            mapState.put(timeStr, nextDeadline);
            ctx.registerProcessingTimeTimer(nextDeadline);
        }

        mapState.put(countStr, newCount);

        return TriggerResult.CONTINUE;
    }

    @Override
    // The onEventTime() method is called when a registered event-time timer fires.
    public TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx) {
        return TriggerResult.CONTINUE;
    }

    @Override
    // The onProcessingTime() method is called when a registered processing-time timer fires.
    public TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx) throws Exception {
        MapState<String, Long> mapState = ctx.getPartitionedState(mapStateDesc);
        final long deadline = mapState.get(timeStr);
        if (deadline == time) {
            return fire(mapState);
        }
        return TriggerResult.CONTINUE;
    }

    @Override
    // Finally the clear() method performs any action needed upon removal of the corresponding window.
    public void clear(W window, TriggerContext ctx) throws Exception {
        MapState<String, Long> mapState = ctx.getPartitionedState(mapStateDesc);
        final long deadlineValue = mapState.get(timeStr);
        if (deadlineValue != Long.MAX_VALUE) {
            ctx.deleteProcessingTimeTimer(deadlineValue);
        }
        mapState.clear();
    }

    private TriggerResult fire(MapState<String, Long> mapState) throws Exception {
        mapState.put(timeStr, Long.MAX_VALUE);
        mapState.put(countStr, 0L);
        return TriggerResult.FIRE;
    }

    public static <T, W extends Window> CountWithTimeTrigger<T, W> of(long maxCount, long intervalMs) {
        return new CountWithTimeTrigger<>(maxCount, intervalMs);
    }
}

调用:

source.windowAll(GlobalWindows.create())
    .trigger(PurgingTrigger.of(CountWithTimeTrigger.of(10, 1000)))
    .process(...)

一直用的都是同一个窗口,PurgingTrigger 的作用是每次触发计算时清空窗口,如果不清理窗口,窗口的数据会一直累加

source.windowAll(GlobalWindows.create())
    .trigger(PurgingTrigger.of(CountWithTimeTrigger.of(10, 1000)))
    .process(new FakePlateOperator())
    .addSink(new MySink());

或者触发计算的时候用 FIRE_AND_PURGE

private TriggerResult fire(MapState<String, Long> mapState) throws Exception {
    mapState.put(timeStr, Long.MAX_VALUE);
    mapState.put(countStr, 0L);
    return TriggerResult.FIRE_AND_PURGE;
}

More from this blog

根据前、中、后序数组构造二叉树

根据两个遍历数组生成二叉树,主要是固定住一个根节点,然后去另一个数组查找下标,划分数组做左右子树,再递归执行左子树和右子树。 这里主要讨论的是使用切片的过程中如何确定切片的起始点,即切片的区间,利用的是左子树的长度。 前序和中序构造二叉树 105. 从前序与中序遍历序列构造二叉树 递归加切片, python 中可以使用 index 函数直接获取值的下标。 注意:切片是左闭右开区间,最后一个值取不到 切片的下标如何思考:利用左子树的长度来辅助思考。idx 是中序数组中的当前节点下标,所以左子...

Apr 3, 20242 min read
根据前、中、后序数组构造二叉树

二叉树的遍历

掌握两种方法进行二叉树的遍历,这里重点看迭代法是怎么写,迭代法使用栈来模拟递归中的栈,也可以使用一种通用方式进行前、中、后序遍历。 递归法 def dfs(root) { // 前序遍历 dfs(root.left) // 中序遍历 dfs(root.right) // 后序遍历 } 迭代法:迭代法是用 stack 栈来模拟递归栈 下面这种写法可以统一前序、中序、后序遍历方式的写法,只需要改变入栈顺序 前序遍历:中,左,右中序遍历:左,中,右后序遍历:左...

Apr 3, 20242 min read
二叉树的遍历

函数式编程在 Java 和 Go 中的应用

函数式编程是一种 "编程范式"(programming paradigm),就是如何编写程序的方法论。 函数式编程特点: 函数是"第一等公民" 只用"表达式",不用"语句" "表达式"(expression)是一个单纯的运算过程,总是有返回值;"语句"(statement)是执行某种操作,没有返回值。函数式编程要求,只使用表达式,不使用语句。也就是说,每一步都是单纯的运算,而且都有返回值。 没有"副作用" 所谓"副作用"(side effect),指的是函数内部与外部互动(最典型的情况,就...

Jun 26, 20237 min read
函数式编程在 Java 和 Go 中的应用

Untitled Publication

13 posts