在flink中的每条记录中追加属于同一组的元素数量

问题描述 投票:0回答:1

虽然有一个文件,其中每行都包含以逗号分隔格式的客户数据

customerId,Email

同一客户可以有多个电子邮件,因此同一客户具有不同电子邮件数据的多行如下所示,相同的客户数据将始终处于连续位置。

1234,[email protected]
1234,[email protected]
5678,[email protected]
9999,[email protected]
9999,[email protected]
9999,[email protected]

需要读取文件,并附加下游每行中每个客户的电子邮件总数,如下所示

1234,[email protected],2
1234,[email protected],2
5678,[email protected],1
9999,[email protected],3
9999,[email protected],3
9999,[email protected],3

这是在 flink 中构建此管道的好方法。 (将有数百万行)

java apache-flink flink-streaming
1个回答
0
投票

如果您无法为此使用 Table API,那么我将通过客户 ID 进行键控,并拥有一个尺寸非常大的翻滚窗口。接下来是

.process(new MyProcessWindowFunction())
,您可以在其中迭代每个元素以在发出结果之前计算计数。

© www.soinside.com 2019 - 2024. All rights reserved.