与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
在Spring Cloud Stream上使用自定义Serde序列化聚合状态存储时出错
[我正在尝试使用Spring Cloud Stream创建一个简单的功能bean,该bean处理来自KStream和GlobalKTable的消息,将它们加入,聚合它们,并将结果输出到新的流中...
我有Kafka流,可以进行中间计算,然后放入mongodb。问题是mongodb无法处理大量来自Kafka流的插入。您如何解决此问题?
我有以下代码段:groupedStream.windowedBy(SessionWindows.with(Duration.ofSeconds(config.joinWindowSeconds))。grace(Duration.ZERO)); KTable mergedTable = ...
系统包含用户实体。每个用户都可以按类型获取消息。如何在卡夫卡组织这件事?我知道,我可以创建主题消息并通过关键的用户ID存储消息,但是如果有百万用户呢? ...
我有一个共享相同密钥的外部应用程序产生多个主题,并且它们共享此模型:主题1:使用unix ms时间戳作为密钥来产生消息。主题2:从主题1读取...
假设我有一个主题(用户),该主题具有json数据但没有架构。数据示例:{“ id”:3151212170,“ name”:“ John Wick”}为了解决这个问题,我创建了一个流(user_stream)来从...
[当我们基于某个键对流应用按功能分组时,kafka如何计算此值,因为相同的键可能出现在不同的分区中?我在看through()函数,基本上...
我有一个带有GlobalKTables的Kafka Streams应用程序。我想计算相同的内存占用量。使用SNAPPY压缩基本Kafka主题中的数据。我找不到...
我是Kafka Streams的新手。我想连接到Kafka Cluster并阅读所有Stream拓扑。是否有允许这样做的API?我在看拓扑类,有没有办法...
我需要为所有消费者发送消息,但是在确定谁应该收到此消息之前,如何使用Kafka进行操作?我应该使用Kafks流过滤数据然后发送给消费者吗?据我所知...
我已经部署了Kafka-Cluster,并且已经在运行该产品,并且主题为“ existing-topic”。我正在使用Debezium的MongoDB-Source-Connector。在这里,我要做的只是推动CDC事件...
我的应用程序使用kafka流抑制逻辑。我想使用抑制来测试kafka流拓扑。正在运行uinit测试,我的拓扑未发出结果。 Kafka流逻辑... .suppress(...
我有一个主题加入了Kafka Streams。流分析数据并将结果放入另一个主题“建议”。一项推荐可以分配给一个或多个用户。如何存储...
Kafka从其他国家/地区获得订单。我需要按国家对这些订单进行分组。我应该创建更多具有国家名称的主题,还是要将一个主题具有不同的分区?另一个是...
Kafka从其他国家/地区获得订单。我需要按国家对这些订单进行分组。我应该创建更多具有国家名称的主题,还是要将一个主题具有不同的分区?另一个是...
Kafka Streams不断进入错误状态,应用程序无法启动
很长时间以来,我一直在研究卡夫卡流。我陷入了一个问题,无法解决。我希望这个平台会有所帮助。所以场景就是这样。 ...
最近,我们的(Kafka 1.1.1)代理崩溃了,我们的Kafka-stream应用程序停止运行。因此,我们手动停止了该应用程序以停止警报。 Kafka再次上线后,我们开始...
我想知道是否指定了流拓扑处理消息的顺序。示例://读取输入消息KStream inputMessages = builder ....
我有一个kafka主题,不同的生产者向它发出1500条消息/秒,每条消息具有两个固定的键RID和Date,(每个消息的其他键也有所不同...]]] >> < [