apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

在将记录发送到 kafka-stream 中的主题时设置标头

我需要处理kafka-stream中的请求记录,并且还需要告诉发送者处理的响应。我创建了一个 Rest 端点来接收请求。该端点使用 spring-kafka 的

回答 1 投票 0


Kafka Streams Binder:访问全局状态存储

与此问题相关: 我正在使用 spring-boot-starter-parent 版本 3.1.2 和 spring-cloud-stream-binder-kafka-streams 版本 4.0.3 我使用以下代码创建了一个 GlobalKTable: @豆

回答 1 投票 0

KeyValueStore 如何允许写入操作(Kafka Streams)

我正在尝试了解kafka Stream的Statestore。我了解了它的一些基础知识。我查看了代码。 这是 StateStore 的声明: 公共接口 StateStore { } 其中之一...

回答 1 投票 0

Kafka Streams - 无法搜索状态存储

我有一个使用两个状态存储的 Kafka Stream 应用程序。我在 Strimzi 集群 (kafka:0.29.0-kafka-3.1.0) 上的 Openshift 上运行此应用程序时遇到问题。 这意味着当我收到 bp-addr 记录时...

回答 1 投票 0

kafka 流日志禁用 INFO

有什么办法可以禁用Kafka流处理摘要信息吗?因为它会占用大量磁盘空间 例如 INFO 21284 --- [-StreamThread-6] o.a.k.s.p.internals.StreamThread :流...

回答 1 投票 0

限制Kafka Streams的内存使用

有没有办法限制或定义kafka流应用程序的最大内存使用量?我已经启用了状态存储的缓存,但是当我在 Openshift 中部署时,我的 pod 上出现了 OOM 问题。我有

回答 1 投票 0

Kafka Stateless KStream:异常时如何提交直到失败点?

假设 Kafka 流按顺序发出事件 1,2,3,4,5。当事件 5 出错时,kafka 流处理器按预期退出。但是,直到事件 4 为止它都无法提交。当我重新启动时...

回答 1 投票 0

如何使用branch()方法获取下游

在kafka-streams中,流上的branch()方法已被弃用。 我从 COnfluence 中找到了一些代码片段,我现在应该使用 split() 和 Branch() 方法,如下所示: artikelEvents.split() ...

回答 1 投票 0

GlobalKTable 作为带有 Kafka Streams Binder 的 QueryableStore

我正在使用 spring-boot-starter-parent 版本 3.1.2 和 spring-cloud-stream-binder-kafka-streams 版本 4.0.3 绝大多数在线示例都显示使用 @Input 注释创建 GlobalKTable...

回答 1 投票 0

Kafka StreamsUncaughtExceptionHandler REPLACE_THREAD 与 SHUTDOWN_CLIENT

我有一个旧主题,其中包含损坏的消息,我需要完全重新处理该主题,忽略无法处理的消息。这是正确的未捕获异常处理策略

回答 1 投票 0

Kafka Streams 使用手动创建的内部主题,给出 TopicAuthorizationException

我使用 KStreams 来消费主题中的数据,对数据执行一些逻辑并写入 KTable。由于安全原因,应用程序无法获得创建内部主题的权限。 巴...

回答 1 投票 0

Quarkus Kafka Streams/反应式消息反序列化异常

嘿,所以我正在尝试使用 Kafka Streams 和 MP Reactive Messaging 来读取 Kafka 主题,然后生成回它。 卡夫卡流错误 - org.apache.kafka.streams.errors.

回答 1 投票 0

单元测试 KafkaStreams 给出 IllegalArgumentException:未知主题

我有一个应用程序,使用 KStream 从 Kafka 读取数据,根据标头过滤数据,然后写入 KTable。 公共拓扑 buildTopology() { KStream inputStream = bui...

回答 1 投票 0

Kafka Streams 节点上内存不足

我正在节点上运行一个 kafka 流应用程序,它耗尽了应用程序的内存, 我想在集群级别运行 kafka 流应用程序 作为国家,我怎样才能实现同样的目标......

回答 1 投票 0

Kafka Processor API 中 Header 有什么用?

我正在学习Kafka Processor API并在ProcessorContext中找到一个方法头。 标题() 返回当前输入记录的标题;可能 如果不可用则为 null 我什么...

回答 2 投票 0

Kafka Streams 中的状态存储与 Ktable

我是 Kafka 和 Kafka Streams 的新手。虽然我已经了解了 Kafka 和 Kafka Streams 的概念,并且在概念上感到自信,但有一件事让我感到困惑。是的,就是

回答 1 投票 0

防止 Kafka Streams Consumer 写入偏移量/等待一个流消耗完所有记录后再启动第二个流

这是一个关于流的二合一问题。 我正在开发一项由两个流组成的服务。一个(第一个)应该消耗整个主题,接收键/值对并存储它们的信息......

回答 1 投票 0

Kafka-Streams 消费者的记录拦截器

我正在寻找 Kafka-streams 来进行事件处理。我尝试为 Kakfa-Streams 添加一个拦截器(针对消费者)。 我添加了一个 RecordInterceptor,如下所示: configMap.put(consumerPrefix(

回答 2 投票 0

状态存储中的密钥重新平衡如何在 Kafka Streams 中进行分区扩展?

假设一个有状态运行的 Kafka 流有一个包含 16 个分区的输入主题,实例(或任务)的数量也是 16。据我所知,Kafka 的默认分区器

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.