apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

通用类型的 Kafka Streams Serdes

我为 Kafka 流写了一个 Serde。我的 Serde 类具有通用类型。当我在 API 方法中显式传递它(覆盖默认值)时它工作正常但当我通过

回答 0 投票 0

我们可以只为 kafka 禁用 log4j 日志吗

我正在使用以下 log4j.properties log4j.rootLogger=调试,标准输出 log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.

回答 4 投票 0

Kafka- 使用非键值将 Kafka 流与全局 K 表连接

我看过多个帖子,说明可以使用记录值而不是全局 k 表上的键将 kafka 流与全局 k 表连接起来 https://kafka.apache.org/20/documentation/st...

回答 1 投票 0

如何确定 InvalidProducerEpochException/ProducerFencedException 的根本原因以及如何修复它

我们有一个在 AWS 中运行的 Kafka 流 spring boot 应用程序。 springKafka版本:2.8.7 apacheKafkaClient版本:3.0.2 融合版本:5.5.5 中间的一些性能测试的一部分......

回答 0 投票 0

使用 Kafka Streams 在输出中设置时间戳

我在 Kafka 主题“原始数据”中获取 CSV,目标是通过在另一个主题“数据”中发送具有正确时间戳(每行不同)的每一行来转换它们。 目前,我有 2 str ...

回答 1 投票 0

Spring Cloud Stream kafka:如何为每个消费者主题创建 DLQ 配置?

以下属性没有按预期工作,我想在重试后为每个消费者主题创建一个 DLQ 溪流: 卡夫卡: 流: binder.deserializationExceptionHandler:sendToDlq 绑定: 李...

回答 0 投票 0

Kafka Streams 指标 - 重新平衡延迟和状态

我正在使用 Spring Boot 和 Micrometer 构建 Kafka Streams 应用程序。我想知道如何收集/公开以下指标: 每个任务从重新平衡到运行所需的延迟。理想...

回答 0 投票 0

从 Kafka Streams 向 Spring Beans 有效公开全球商店

我想通过普通的 Spring Beans 查询 Kafka Stream 全局故事,例如通过@Controller。 我找到了两个例子: SO:使用 Spring 访问 Kafka Stream State Store 自动装配

回答 0 投票 0

Kafka 使用 commit.interval.ms 自定义抑制行为

在Kafka streams中,如果我们有多个partition,想要根据一个key来聚合消息,只为key产生聚合的最终结果。我们不得不使用自定义抑制器使用

回答 0 投票 0

kafka streaming application 不会消耗所有主题,也不会抛出错误

我已经编写了 java springboot kafka 流处理器应用程序(Kubernetes 1 pod,扩展到 3 以查看是否有帮助,传入消息在上午 12 点后增加),它有几个输入目的地...

回答 0 投票 0

设置 S3 Source Connector 时找不到类

我正在尝试设置我的 Kafka S3 源连接器以从我的 S3 存储桶中提取文件。但是,当我在连接时检查连接器的状态时,我将其作为错误响应 error_code: 500

回答 0 投票 0

如果使用消息的 kafka 流应用程序有一些异常,如何从 apache Kafka 重新读取消息

我正在使用 kafka 流读取来自 kafka 的消息进行实时处理。 然后,如果在...中计算消息时出现错误,我会将此消息发送给其他 rest api(最终服务)

回答 1 投票 0

如何从单个记录创建多个记录?

假设我在流 A 中有一条具有以下架构的记录: {时间戳,json} 示例记录: {“行”:{“列”:[1678710830000,{“1”:“abc”,“2”:...

回答 0 投票 0

为什么 KStreams 的 outerJoin 滑动窗口的不完整结果在新的不相关记录稍后到达之前不会被刷新?

我有三个主题和一个从中读取的 KStream 应用程序。 x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。 关键是,在同一个窗口中...

回答 0 投票 0

为什么 KStreams 的 outerJoin 滑动窗口的不完整结果直到新的不相关记录稍后到达才发布?

我有三个主题和一个从中读取的 KStream 应用程序。 x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。 关键是,在同一个窗口中...

回答 0 投票 0

Kafka standby replica 清晰度寻求

我想了解一下 Kafka 备用副本的工作原理! 鉴于这种情况; 一个 Kafka 流应用程序; 从具有 10 个分区的源主题读取 写给...

回答 1 投票 0

是否可以使用窗口加入可选的多个(3+)KStreams?

我有三个主题和一个从中读取的 KStream 应用程序。 x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。 关键是,在同一个窗口中...

回答 0 投票 0

运行时订阅多个kafka主题

我想用 Java Quarkus 创建一个微服务来在运行时订阅多个主题。该服务将定期(每天一次)从 API 读取主题并更新主题以...

回答 1 投票 0

为 Springboot 应用程序的多个实例设置 Actuator Kafkastreams 健康检查

我们有 x 个 KafkaStreams Springboot 应用程序实例在我们的云环境中并行运行。我想用 Springboot Actuator 实现健康检查,它会返回

回答 0 投票 0

java.lang.ClassCastException:kafka.cluster.BrokerEndPoint 无法转换为 kafka.cluster.Broker:kafka streaming 和 spark

我在用 卡夫卡 0.8.2.1 火花 2.1.2 我试图运行一个代码,它将数据从 kafka 流式传输到 spark bu 我收到这个错误 文件“c:/Users/anish/OneDrive/Desktop/major project/

回答 0 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.