Flink - 为什么有界流打印顺序不正确?

问题描述 投票:0回答:1

在下面的代码中:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<Tuple2<String, Integer>> dataStream = env.fromElements(
        Tuple2.of("01", 1),
        Tuple2.of("02", 2),
        Tuple2.of("03", 3),
        Tuple2.of("04", 4),
        Tuple2.of("05", 5)
        );  
    dataStream.print();

输出显示:

    Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1227405543]
08/26/2023 19:00:45     Job execution switched to status RUNNING.
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to SCHEDULED
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to SCHEDULED 
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to DEPLOYING 
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to SCHEDULED
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to DEPLOYING
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to RUNNING 
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to RUNNING
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to RUNNING
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to RUNNING
5> (05,5)
1> (01,1)
3> (03,3)
2> (02,2)
4> (04,4)
08/26/2023 19:00:45     Source: Collection Source(1/1) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(6/8) switched to FINISHED 
08/26/2023 19:00:45     Sink: Unnamed(5/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(8/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(7/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(1/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(3/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(2/8) switched to FINISHED
08/26/2023 19:00:45     Sink: Unnamed(4/8) switched to FINISHED
08/26/2023 19:00:45     Job execution switched to status FINISHED.

  1. 为什么输出显示流乱序?如何开启排序功能?

    5> (05,5)
    1> (01,1)
    3> (03,3)
    2> (02,2)
    4> (04,4)
    
  2. 根据跟踪,为什么启用多个接收器(几乎 8 个)?

java apache-flink flink-streaming
1个回答
0
投票

Flink中有一个并行的概念,你可以认为是多个线程同时工作。当代码中没有明确并行度时,会使用CPU的数量作为默认的并行度。这也解释了为什么有8个水槽。

所以在你的代码中,这五个元组是在不同的线程中执行的,所以多个线程同时工作,最终的输出是不有序的。

当这些元组在同一个线程中执行时,它们的输出结果是有序的,这就是你定义的元素的顺序。你可以通过这样设置将全局并行度设置为1

env.setParallelism(1);
,但是这样会降低处理效率。

如果想实现多并行度下的order,只能在sink时改变并行度,在同一个线程中处理。

© www.soinside.com 2019 - 2024. All rights reserved.