Skip to main content

Command Palette

Search for a command to run...

Flink Side Output 收集不到数据的问题

Updated
1 min read
Flink Side Output 收集不到数据的问题

在 SideOutput 使用过程中,如果要想数据被收集到 OutputTag 中然后 getSideOutput() 获取一个新的流的话,在 context.output() 收集数据的算子之后不能再有算子,必须生成一个新的对象,不然不会有数据输出。在下面代码中,process1 的数据可以正常输出,但 process2 因为后面还有 map 函数,所以数据并没有被输出。

public class StreamingJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> stream = env.addSource(new SourceFunction<Integer>() {
            @Override
            public void run(SourceContext<Integer> sourceContext) throws Exception {
                while (true) {
                    Random random = new Random();
                    sourceContext.collect(random.nextInt(10));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {

            }
        });
        OutputTag<String> intTag = new OutputTag<>("intTag", TypeInformation.of(String.class));
        SingleOutputStreamOperator<Integer> process1 = stream.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer integer, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) {
                context.output(intTag, "first" + integer);
            }
        });
        SingleOutputStreamOperator<Integer> process2 = stream.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer integer, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) {
                context.output(intTag, "second" + integer);
            }
        }).map(v -> v);
        process1.getSideOutput(intTag).print();
        process2.getSideOutput(intTag).print();

        env.execute();
    }
}

输出:

8> first4
1> first1
2> first3
3> first5
4> first3
5> first7
6> first6
7> first5

More from this blog

根据前、中、后序数组构造二叉树

根据两个遍历数组生成二叉树,主要是固定住一个根节点,然后去另一个数组查找下标,划分数组做左右子树,再递归执行左子树和右子树。 这里主要讨论的是使用切片的过程中如何确定切片的起始点,即切片的区间,利用的是左子树的长度。 前序和中序构造二叉树 105. 从前序与中序遍历序列构造二叉树 递归加切片, python 中可以使用 index 函数直接获取值的下标。 注意:切片是左闭右开区间,最后一个值取不到 切片的下标如何思考:利用左子树的长度来辅助思考。idx 是中序数组中的当前节点下标,所以左子...

Apr 3, 20242 min read
根据前、中、后序数组构造二叉树

二叉树的遍历

掌握两种方法进行二叉树的遍历,这里重点看迭代法是怎么写,迭代法使用栈来模拟递归中的栈,也可以使用一种通用方式进行前、中、后序遍历。 递归法 def dfs(root) { // 前序遍历 dfs(root.left) // 中序遍历 dfs(root.right) // 后序遍历 } 迭代法:迭代法是用 stack 栈来模拟递归栈 下面这种写法可以统一前序、中序、后序遍历方式的写法,只需要改变入栈顺序 前序遍历:中,左,右中序遍历:左,中,右后序遍历:左...

Apr 3, 20242 min read
二叉树的遍历

函数式编程在 Java 和 Go 中的应用

函数式编程是一种 "编程范式"(programming paradigm),就是如何编写程序的方法论。 函数式编程特点: 函数是"第一等公民" 只用"表达式",不用"语句" "表达式"(expression)是一个单纯的运算过程,总是有返回值;"语句"(statement)是执行某种操作,没有返回值。函数式编程要求,只使用表达式,不使用语句。也就是说,每一步都是单纯的运算,而且都有返回值。 没有"副作用" 所谓"副作用"(side effect),指的是函数内部与外部互动(最典型的情况,就...

Jun 26, 20237 min read
函数式编程在 Java 和 Go 中的应用

Untitled Publication

13 posts