apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

在Spring Cloud Stream上使用自定义Serde序列化聚合状态存储时出错

[我正在尝试使用Spring Cloud Stream创建一个简单的功能bean,该bean处理来自KStream和GlobalKTable的消息,将它们加入,聚合它们,并将结果输出到新的流中...

回答 1 投票 0


何时数据库在卡夫卡对中成为瓶颈?

我有Kafka流,可以进行中间计算,然后放入mongodb。问题是mongodb无法处理大量来自Kafka流的插入。您如何解决此问题?

回答 1 投票 0

Kafka Streams API:会话窗口不兼容的类型

我有以下代码段:groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds))。grace(Duration.ZERO)); KTable mergedTable = ...

回答 1 投票 0

Kafka如何向特定用户发送消息?

系统包含用户实体。每个用户都可以按类型获取消息。如何在卡夫卡组织这件事?我知道,我可以创建主题消息并通过关键的用户ID存储消息,但是如果有百万用户呢? ...

回答 2 投票 0

基于相同的密钥在Kafka中加入/分组多个主题

我有一个共享相同密钥的外部应用程序产生多个主题,并且它们共享此模型:主题1:使用unix ms时间戳作为密钥来产生消息。主题2:从主题1读取...

回答 1 投票 0

如何使用大写键将json数据下沉到Postgres表中?

假设我有一个主题(用户),该主题具有json数据但没有架构。数据示例:{“ id”:3151212170,“ name”:“ John Wick”}为了解决这个问题,我创建了一个流(user_stream)来从...

回答 1 投票 1

Kafka重新分区(用于基于密钥的分组依据)

[当我们基于某个键对流应用按功能分组时,kafka如何计算此值,因为相同的键可能出现在不同的分区中?我在看through()函数,基本上...

回答 1 投票 0

计算GlobalKTables的内存占用量

我有一个带有GlobalKTables的Kafka Streams应用程序。我想计算相同的内存占用量。使用SNAPPY压缩基本Kafka主题中的数据。我找不到...

回答 1 投票 0

列出给定的Kafka群集中的所有处理器拓扑

我是Kafka Streams的新手。我想连接到Kafka Cluster并阅读所有Stream拓扑。是否有允许这样做的API?我在看拓扑类,有没有办法...

回答 1 投票 0

如何在Apache Kafka中进行扇出?

我需要为所有消费者发送消息,但是在确定谁应该收到此消息之前,如何使用Kafka进行操作?我应该使用Kafks流过滤数据然后发送给消费者吗?据我所知...

回答 1 投票 0

我可以更改由Kafka-Connect推送的主题

我已经部署了Kafka-Cluster,并且已经在运行该产品,并且主题为“ existing-topic”。我正在使用Debezium的MongoDB-Source-Connector。在这里,我要做的只是推动CDC事件...

回答 1 投票 0

我可以测试kafka流抑制逻辑吗?

我的应用程序使用kafka流抑制逻辑。我想使用抑制来测试kafka流拓扑。正在运行uinit测试,我的拓扑未发出结果。 Kafka流逻辑... .suppress(...

回答 1 投票 0

如何在Kafka中为用户分配特定数据?

我有一个主题加入了Kafka Streams。流分析数据并将结果放入另一个主题“建议”。一项推荐可以分配给一个或多个用户。如何存储...

回答 1 投票 0

我应该创建更多主题还是更多分区?

Kafka从其他国家/地区获得订单。我需要按国家对这些订单进行分组。我应该创建更多具有国家名称的主题,还是要将一个主题具有不同的分区?另一个是...

回答 1 投票 1

我应该创建主题还是创建更多分区?

Kafka从其他国家/地区获得订单。我需要按国家对这些订单进行分组。我应该创建更多具有国家名称的主题,还是要将一个主题具有不同的分区?另一个是...

回答 1 投票 1

Kafka Streams不断进入错误状态,应用程序无法启动

很长时间以来,我一直在研究卡夫卡流。我陷入了一个问题,无法解决。我希望这个平台会有所帮助。所以场景就是这样。 ...

回答 1 投票 1

Kafka流不起作用:组协调器不断发现并且不可用/无效

最近,我们的(Kafka 1.1.1)代理崩溃了,我们的Kafka-stream应用程序停止运行。因此,我们手动停止了该应用程序以停止警报。 Kafka再次上线后,我们开始...

回答 1 投票 1

是否指定了Kafka Streams拓扑的处理顺序?

我想知道是否指定了流拓扑处理消息的顺序。示例://读取输入消息KStream inputMessages = builder ....

回答 1 投票 0

来自kafka主题的独特消息,并在一定时间间隔内

我有一个kafka主题,不同的生产者向它发出1500条消息/秒,每条消息具有两个固定的键RID和Date,(每个消息的其他键也有所不同...]]] >> < [

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.