在下面的代码中:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromElements(
Tuple2.of("01", 1),
Tuple2.of("02", 2),
Tuple2.of("03", 3),
Tuple2.of("04", 4),
Tuple2.of("05", 5)
);
dataStream.print();
输出显示:
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1227405543]
08/26/2023 19:00:45 Job execution switched to status RUNNING.
08/26/2023 19:00:45 Source: Collection Source(1/1) switched to SCHEDULED
08/26/2023 19:00:45 Source: Collection Source(1/1) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(1/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(1/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(2/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(2/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(3/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(3/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(4/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(4/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(5/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(5/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(6/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(6/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(7/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(7/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(8/8) switched to SCHEDULED
08/26/2023 19:00:45 Sink: Unnamed(8/8) switched to DEPLOYING
08/26/2023 19:00:45 Sink: Unnamed(3/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(4/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(2/8) switched to RUNNING
08/26/2023 19:00:45 Source: Collection Source(1/1) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(1/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(5/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(6/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(8/8) switched to RUNNING
08/26/2023 19:00:45 Sink: Unnamed(7/8) switched to RUNNING
5> (05,5)
1> (01,1)
3> (03,3)
2> (02,2)
4> (04,4)
08/26/2023 19:00:45 Source: Collection Source(1/1) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(6/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(5/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(8/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(7/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(1/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(3/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(2/8) switched to FINISHED
08/26/2023 19:00:45 Sink: Unnamed(4/8) switched to FINISHED
08/26/2023 19:00:45 Job execution switched to status FINISHED.
为什么输出显示流乱序?如何开启排序功能?
5> (05,5)
1> (01,1)
3> (03,3)
2> (02,2)
4> (04,4)
根据跟踪,为什么启用多个接收器(几乎 8 个)?
Flink中有一个并行的概念,你可以认为是多个线程同时工作。当代码中没有明确并行度时,会使用CPU的数量作为默认的并行度。这也解释了为什么有8个水槽。
所以在你的代码中,这五个元组是在不同的线程中执行的,所以多个线程同时工作,最终的输出是不有序的。
当这些元组在同一个线程中执行时,它们的输出结果是有序的,这就是你定义的元素的顺序。你可以通过这样设置将全局并行度设置为1
env.setParallelism(1);
,但是这样会降低处理效率。
如果想实现多并行度下的order,只能在sink时改变并行度,在同一个线程中处理。