apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

如何从Kafka Stream变更日志主题读取数据?

我使用Kafka Stream来处理我的主题A,并使用inMemoryKeyValueStore。 builder.addStateStore(Stores.keyValueStoreBuilder( //Stores.persistentKeyValueStore("AccurateADCounts"), ...

回答 1 投票 0

kafka 流处理器内的异步 http 调用并将响应转发到下一个处理器

我有一个kafka流应用程序(用java springboot编写),它有3个处理器。在第二个处理器中,我想以异步方式调用rest api,当响应到来时,我想...

回答 1 投票 0

卡夫卡流。如何使用抑制器在聚合窗口中发出最终结果

令我惊讶的是,我意识到“抑制”运算符不会在窗口关闭时发出最后一个事件,而是仅当在分区上发布另一个事件时,流的任务才会发出...

回答 1 投票 0

处理并忽略 Kafka Streams 中的 UNKNOWN_TOPIC_OR_PARTITION 错误

我正在使用 Kafka Streams 应用程序,我们使用基于消息头的动态主题确定。在我们的设置中,在应用程序运行时删除主题是正常的。

回答 1 投票 0

spring-cloud-stream-binder-kafka-streams 消费者在发生 RuntimeException 时关闭

spring-cloud-stream-binder-kafka-streams 当消费者发生异常时,消费者停止并进入 EMPTY 状态。我想测试重试机制,但它没有按预期工作。 (https://docs.s...

回答 1 投票 0

有没有办法同步具有kafka流的应用程序以避免重复的消息处理?

在我的 Spring Boot 应用程序中,我使用了 kafka 流。它首先按键对来自某个主题的消息进行分组,根据一定的时间间隔对它们进行窗口化,使用reduce仅保留最新的m...

回答 1 投票 0

golang 中的 kafka 流

我正在尝试使用golang在Go中创建kafka流客户端。据我所知,只有使用 Java 客户端才有可能实现这一点。我做了一些搜索,发现了一些其他第三方库......

回答 2 投票 0

使用 Kafka Streams 仅基于密钥过滤和转发 Kafka 消息

我想了解并可能改进我目前正在处理的一些 Kafka Streams 代码。现在我不完全确定反序列化和序列化的完整生命周期

回答 1 投票 0

在没有 Kafka-Streams 的情况下在 Kafka 上进行共同分区

我很好奇 kafka 是否默认在消费者组内进行共同分区,或者这是否是 kafka-streams 添加的功能。 例如:假设我有一个消费者组 group-1 并且该组消费

回答 1 投票 0

当事件时间在 ts 到 ts -1 分钟之间的所有消息中满足条件时,kafka 会生成消息

我想要处理kafka消息,考虑到eventTime,我在其中接收非线性格式的数据 我想在收到以下 JSON 格式的消息后,如果我得到一个

回答 1 投票 0

使用 Kafka Stream 按时间戳聚合事件

我有两个包含一些指标事件的主题。每个指标都是在特定时间戳生成的。我需要合并一些指标;唯一的共同键是时间戳。 我创建了两个 KStream wi...

回答 1 投票 0

Kafka 流应用程序想要检索错误的模式

我们在其中一个 Kafka 流应用程序中看到这样的错误: org.apache.kafka.streams.errors.StreamsException:进程中捕获异常。 taskId=0_6,处理器=KSTREAM-SOURCE-0000000000,主题=

回答 1 投票 0

我使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题

我在使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题 该方法是否应该将左流中的所有记录保留 24 小时以查找匹配

回答 1 投票 0

Kafka Streams:通过两个或多个外键重新设置 KTable

假设有一个(简化的)A类,如下所示: A类{ 私人长ID; 私有字符串一些内容; 私人长fk1; 私人长fk2; // 相应的 getter 和 setter } fk1 和 fk2 ...

回答 1 投票 0

为物化状态存储设置 lz4 主题压缩

假设 Quarkus Kafka Streams 应用程序通过物化进行某种重新键入和聚合,如下所示: 最终 KTable groupedCommunications =

回答 1 投票 0

Kafka Streams 主题保留问题:动态主题配置和未发布的数据

我在与 Spring Cloud Stream 集成的 Kafka Streams 应用程序方面遇到问题。问题围绕着一个特定的主题,它的大小不断增加并且从不释放数据:

回答 1 投票 0

Kafka Streams KTable-KTable FK join 分区主题 - 分区路由问题?

给定两个实体代理和配置: 类代理机构{ 长 ID; // PK UUID配置Id; // FK -> 配置.id // ... } 类配置{ UUID ID; // PK // ... }

回答 1 投票 0

Kafka Streams 速率限制消耗

我试图根据条件限制我对某个主题的消耗率。 例如: 如果消息包含:“foo” 我想每秒消耗不超过 5 条消息(全球 - 意思是...

回答 1 投票 0

声明检查模式可以与 Kafka Streams 一起使用吗?

在大多数围绕 kafka 流的准备工作中,不建议您在 kafka 流应用程序中请求/响应,或者更一般地在流中与外部系统同步交互

回答 1 投票 0

Kafka KStreams 左连接没有给我预期的输出

直播1: 键:A,值:1 键:B,值:2 键:A,值:3 键:B,值:4 直播2: 键:A,值:X 键:B,值:Y 我得到的输出: 值 1:1,值 2:X 值 1:2,值 2:Y 值 1:3,值...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.