apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

BindingsEndpoint kafka Stream spring boot

我猜。它在本地电脑上运行良好,但在云 k8s 上运行不正常,请帮忙,谢谢 在此输入图像描述 'org.springframework.cloud.stream.endpoint.BindingsEndpoint' 不能...

回答 1 投票 0

如何使用Spring Cloud Stream中的Kafka Streams绑定器将同一主题绑定到多个函数?

使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @豆 公共函数,KStream 使用 Spring Cloud Stream 和 Kafka Streams 绑定器,我想在另一个函数中处理函数的输出,如下所示: @Bean public Function<KStream<String, Double>, KStream<String, Double>> sqrt() { return numbers -> numbers.mapValues(Math::sqrt); } @Bean public Consumer<KStream<String, Double>> log() { return sqrt -> sqrt.foreach((key, value) -> log.info("{}: {}", key, value)); } 其中 sqrt() 输出数字的平方根,然后用 log() 记录。 application.yaml因此看起来像这样: spring: cloud: stream: function: bindings: sqrt-in-0: numbers sqrt-out-0: sqrt-numbers log-in-0: sqrt-numbers kafka: streams: bindings: sqrt: consumer: application-id: sqrtApplicationId log: consumer: application-id: logApplicationId 启动应用程序时,出现以下错误: The bean 'sqrt-numbers' could not be registered. A bean with that name has already been defined and overriding is disabled. Action: Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true 当然,现在将 definition-overriding 设置为 true 并不是一个正确的解决方案,并且它会失败并显示 IllegalStateException。 我该如何解决这个问题? 问题的重现可以在这里找到:https://github.com/cedric-schaller/dltawareprocessor-type-error 假设您有两个名为 numbers 和 sqrt-numbers 的 Kafka 主题,则以下配置应该有效。 spring: cloud: stream: bindings: sqrt-in-0: destination: numbers sqrt-out-0: destination: sqrt-numbers log-in-0: destination: sqrt-numbers kafka: streams: bindings: sqrt-in-0: consumer: application-id: sqrtApplicationId log-in-0: consumer: application-id: logApplicationId 您可以使用 spring.cloud.stream.function.bindings.. 覆盖默认绑定名称。例如,如果您想将绑定名称从 sqrt-in-0 更改为 input,您可以像 spring.cloud.stream.function.bindings.sqrt-in0-0: input 那样进行操作。不过,您仍然需要在覆盖的绑定上设置 destination(通过 spring.cloud.stream.bindings.input.destination)。 您遇到的特定异常是因为您试图重用已创建的绑定名称 - sqrt-numbers。

回答 1 投票 0

Kafka GroupTable 测试使用 ProcessorTopologyTestDriver 时生成额外消息

我编写了一个流,它接收消息并发送出已出现的键表。如果出现某些东西,它将显示计数为 1。这是我的生产 c 的简化版本...

回答 1 投票 0

为什么我的 Kafka Streams 拓扑无法正确重放/重新处理?

我有一个如下所示的拓扑: KTable users = topology.table(USERS); KStream joinRequests = topology.stream(JOIN_REQUESTS) .地图...

回答 2 投票 0

将 Kafka 输入流动态连接到多个输出流

Kafka Streams 中是否内置了允许将单个输入流动态连接到多个输出流的功能? KStream.branch 允许基于真/假预测进行分支...

回答 1 投票 0

合并多个相同的 Kafka Streams 主题

我有 2 个 Kafka 主题,从不同的来源流式传输完全相同的内容,这样我就可以在其中一个来源发生故障时获得高可用性。 我正在尝试将 2 个主题合并为 1 个输出...

回答 1 投票 0

使用 avro 作为带有 kafka 模式注册表的关键主题

我和我的团队最近遇到了用于主题键的 Avro 架构问题。我们更改了对键的评论,这完全破坏了我们的 Kafka Streams 连接,也破坏了我们主题的压缩......

回答 1 投票 0

Kafka 流卡住分区

我们有一组 kafka 流/spring-boot/spring-kafka 应用程序,在过去几天发生了一个事件,我们注意到一个主题的单个分区有数千个我......

回答 1 投票 0

已弃用 KStreams TransformerSupplier 至 ProcessorSupplier

鉴于 flatTransform 已被弃用,我正在尝试按照建议通过流程替换它。 我以前的 TransformerSupplier 看起来像这样: 公共类 MyTransformerSupplier 实现

回答 1 投票 0

六边形架构中的Kafka流

我正在六边形架构中创建服务,该服务消耗来自主题的数据。在同一个项目中,我想使用 kafka 流将几个主题合并为一个主题,然后使用其中的数据......

回答 1 投票 0

当监听器被移除时如何处理kafka主题

我有一个特定主题的卡夫卡消费者。由于一些原因我改变了消费者。现在我不再使用消息,而是使用一个 kafka-stream 来处理主题并生成新消息到

回答 1 投票 0

kafka集群非刷盘模式是否可以对某些主题生效?

如果我想让Kafka集群支持准实时消息转换(< 500 ms), do I need to set the Kafka cluster to the non-flushing disk mode? If I set non-flushing disk mode, Whether non-flu...

回答 1 投票 0

kafka Streams groupBy 内部做了什么?

假设有一个主题,其中不同文件的块全部混合在一起,由一个元组(FileId,Chunk)表示。 同一文件的块也可能有点乱序。 任务是聚合...

回答 1 投票 0

如何以避免从其变更日志主题重新创建状态存储的方式重新启动 KafkaStreams 消费者组

在多个节点托管具有持久状态存储的 KafkaStreams (0.10.2.1) 实例的部署中,重新启动所有节点同时避免重播整个过程的推荐方法是什么...

回答 2 投票 0

具有大型滑动窗口的Kafka Streams

我需要显示过去 3 个月、6 个月和 1 年中任何时间点的使用统计数据。我计划在上述持续时间内使用 KStream 滑动窗口。大多数例子我...

回答 2 投票 0

如何通过检查应用程序的所有实例来从状态存储中删除记录

我的应用程序有多个实例,每个实例都有其状态存储,我们在其中存储(键,值)信息,应用程序将使用这些信息进行功能处理。 我们的要求是删除特定条目

回答 1 投票 0

Do <stream>.toTable().toStream() 返回原始流并删除相同键的旧记录?

我有一个可以包含重复记录的KStream。我可以使用 .groupByKey().reduce() 删除具有相同键的旧记录,但想知道是否可以通过 .toTable().toStream 实现这一点...

回答 1 投票 0

Kafka Streams 状态存储获取操作什么也取不到,即使更改日志主题显示了一个值

我在我的拓扑中添加了一个持久键值存储。在不涉及太多细节的情况下,我将解释我面临问题的部分。我想查询国营商店。 当我收到

回答 1 投票 0

Ksqldb、kafka 流。将主题消息拆分并按条件发布到不同的主题

我有一个主题,比如说“topic_soure”。消息采用 json 格式。 所有消息的顶级字段都是相同的,但“数据”字段可能有不同的模型。 我不知道前...

回答 1 投票 0

我们可以将 KStream 转换为 Apache Kafka 中的全局 KTable 吗?

我正在尝试在 scala 中使用 kafka 流。我想知道是否可以将 KStream 转换为 GlobalKTable?

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.