SCDF聚合器仅发出最后一条记录

问题描述 投票:0回答:1

我目前正在试验一种用例,我们必须先将多个输入消息聚合到一个用例中,然后再将其写入文件系统。为此,我正在尝试使用此处提供的聚合器模块https://repo.spring.io/release/org/springframework/cloud/stream/app/aggregator-processor-kafka/2.1.1.RELEASE/

我的环境是在Docker Dataflow 2.4.1.Release和Skipper 2.3.1.Release上运行的本地SCDF / Kafka

流定义很简单,如下所示

:inputTopic>聚合器>:outputTopic

在聚合器属性中提供这些属性值。

release: size() == 5
group-timeout: 5000
correlation: headers.headerName
aggregation: T(org.springframework.util.StringUtils).collectionToDelimitedString(#this.![new String(payload)],'')
message-store-type: simple

我有两个问题

  1. 尽管我给出的发布大小为5,如果说10消息,我看不到我的输出主题来接收两条消息
  2. 输出主题仅包含聚集头的最后一条消息,而不包含此列表的串联值。

请提出是否还有其他配置可帮助实现汇总输出。

另一个观察结果是,如果我有流作为

http | aggregator > :topic

相同的聚合器配置可以正常运行,并且可以根据需要运行。但是按照我们的方式,数据是通过使用KStreams的处理器放入主题中的,并且不确定是否存在使用这些默认启动应用程序生成标头并将其读取到自定义处理器的方法。

在聚合器调试期间的另一项观察结果是,correlationKey是通过hashCode获得或比较的,而不是作为字符串获取的,不确定是错误还是功能。但这是导致预期行为中断的原因。

spring-cloud-stream spring-cloud-dataflow
1个回答
0
投票

对于任何寻求答案的人,我要做的就是将相关性更改为

new String[headers.headerName]

并且将输入值序列化器显式配置为bytearrayserializer

但是我来自Spring团队的问题是,如果我不必触摸聚合器,是否可以在2.2.4引导中开发的Kstream应用程序的输出生产者属性中设置一个属性,以将标题作为字符串而不是字节发送数组?

我使用以下API设置标题

context.headers().add(key, value.getBytes())

我尝试过

producer.header-mode: to none/headers.

但是我的汇总器仍未汇总,仅提供最新记录。只有更改了聚合器属性后,它才起作用。但我想避免更改聚合器属性,因为该流不归我们所有,并且旧版应用程序可以很好地用于其他旧版流。

© www.soinside.com 2019 - 2024. All rights reserved.