Flink Side Output 收集不到数据的问题

在 SideOutput 使用过程中,如果要想数据被收集到 OutputTag 中然后 getSideOutput() 获取一个新的流的话,在 context.output() 收集数据的算子之后不能再有算子,必须生成一个新的对象,不然不会有数据输出。在下面代码中,process1 的数据可以正常输出,但 process2 因为后面还有 map 函数,所以数据并没有被输出。
public class StreamingJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> stream = env.addSource(new SourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> sourceContext) throws Exception {
while (true) {
Random random = new Random();
sourceContext.collect(random.nextInt(10));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
}
});
OutputTag<String> intTag = new OutputTag<>("intTag", TypeInformation.of(String.class));
SingleOutputStreamOperator<Integer> process1 = stream.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer integer, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) {
context.output(intTag, "first" + integer);
}
});
SingleOutputStreamOperator<Integer> process2 = stream.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer integer, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) {
context.output(intTag, "second" + integer);
}
}).map(v -> v);
process1.getSideOutput(intTag).print();
process2.getSideOutput(intTag).print();
env.execute();
}
}
输出:
8> first4
1> first1
2> first3
3> first5
4> first3
5> first7
6> first6
7> first5




