我正在测试Apache Flink(使用v1.8.2)从Kinesis Data Stream读取消息的速度。Kinesis Data Streams仅包含一个分片,并且包含40,000条消息。每个邮件的大小都在5 KB以下。
[尝试使用TRIM_HORIZON从最早的消息中读取流,我希望该应用程序应该能够快速读取所有消息,因为每个分片最多可以通过GetRecords每秒支持最大2 MB的总数据读取速率。使用连接器配置(SHARD_GETRECORDS_MAX = 400,SHARD_GETRECORDS_INTERVAL_MILLIS = 1000),应用程序应在几分钟内完成以读取所有消息。但是由于某些原因,阅读所有消息会花费很多时间。
您介意检查我的连接器配置有什么问题吗?感谢您的帮助。
public static DataStream<ObjectNode> createKinesisStream(
StreamExecutionEnvironment env) throws IOException {
Properties properties = new Properties();
properties.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
properties.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_MAX, "400");
properties.put(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");
properties.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "PROFILE");
properties.put(ConsumerConfigConstants.AWS_PROFILE_NAME, "d");
return env.addSource(new FlinkKinesisConsumer<>(
"stream1", new JsonNodeDeserializationSchema(), properties));
}
main() code:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(10000L);
source = AppConfig.createKinesisStream(env);
DataStream<ObjectNode> filteredStream = source
.map(new CustomMap());
I have put a counter in discarding sink, in one fetch it read 27 messages( counter 829-855)
24 Mar 2020 08:11:50,519 INFO DiscardingSink:15 - 827
24 Mar 2020 08:11:50,519 INFO DiscardingSink:15 - 828
24 Mar 2020 08:11:51,631 INFO DiscardingSink:15 - 829
24 Mar 2020 08:11:51,631 INFO DiscardingSink:15 - 830
.
.
24 Mar 2020 08:11:51,639 INFO DiscardingSink:15 - 854
24 Mar 2020 08:11:51,639 INFO DiscardingSink:15 - 855
24 Mar 2020 08:11:52,749 INFO DiscardingSink:15 - 856
一种可能的解释是您的管道中的某些东西正在对源施加反压力。要仅测量源的容量,可以将工作简化为:
source.addSink(new DiscardingSink<>());
DiscardingSink
在哪里
public static class DiscardingSink<OUT> implements SinkFunction<OUT> {
@Override
public void invoke(OUT value, Context context) throws Exception {
}
}