与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
我有一个关于测试kafak流的问题,特别是测试kafka流内部的连接。 我有一个员工的小用例,我想加入保险。输出将是...
在使用 PySpark 消费来自 Kafka 的消息时处理架构演变
我是卡夫卡的新手。目前我正在处理一个要求 - 用例: 我正在使用来自 Kafka 的消息(消息由上游团队在 Kafka 中生成)。上游团队不维护...
来自 Kafka 主题的 Spark 消费者流不断重置偏移量
所以我正在运行一个 spark 流程序,并使用 readStream 方法的以下选项从 Kafka 主题读取流。 选项= { ... ... “kafka.security.
无法使用 MySQL 连接器 Kafka 中的 where 子句运行查询
JSON 文件: { “名称”:“mysql-jdbc”, “配置”:{ “connector.class”:“io.confluent.connect.jdbc.JdbcSourceConnector”, “连接。你...
使用调度程序和 kafka 时勇敢的行李引发 outOfMemoryError
Vsem 女贞。 V svoyem prilozheniye ya ispol'zuyu kafka stream i standartnyy scheduler postavlyayemyy spring。 Pri rabote Scheduler otpravlyayutsya soobshcheniya v kafku a potom vychityvayutsya ot t...
用于查看来自微服务的所有 Kafka 输入和输出主题的软件 [关闭]
作为全面了解我组织中每个微服务使用的所有 Kafka 主题的一部分,我正在寻找一种自动化工具,它可以扫描并提供我...
如何将 Kafka 流的简单整数密钥转换为复杂的 Avro 类型密钥
我有一个流式应用程序拓扑结构,它使用一个带有整数键和 AVRO 主体的简单主题。我想操纵流并使用复杂的 AVRO 密钥写入主题。但是我...
我写了一个带有 kafka 流的 Quarkus 应用程序(2.16 版)。由于 kafka 主题中的大量消息,我的应用程序的消费者组计数滞后。所以入住后...
Kafka Streams 开启 StateStore 日志循环
我有以下拓扑定义,环境中有两个应用程序实例: KStream stream = kStreamBuilder.stream(inputTopic); 流。
即使 Eaxctly-once 被禁用,Kafka Streams 是否会保证在有状态处理器中至少处理一次?
由于基础设施限制,我们在没有启用 EOS 的情况下运行 kafka 流应用程序时,就会想到这个问题。在使用 transformer/
Kafka Streams Yearly time Window
我们正在处理的一个应用程序的要求是,聚合以窗口方式发生,并且窗口大小可能每月/每季度/半年/每年变化。 卡夫卡的...
是否有可能具有执行以下操作的拓扑(简化示例): / --> branch1 --> 输出接收器 / (
我觉得浮士德有像kafka stream这样的拓扑:enter image description here 我很困惑,因为我使用浮士德,你怎么看?谢谢你的时间
当一些流线程死亡时(例如因为一个异常),我不想继续,而是重新启动进程。为了做到这一点,我需要识别这种状态。我知道我可以使用...
I want to join two topic streams (left join) and do a window-based aggregation over the joined stream. However, the aggregation is counting some of the messages twice as during join some messages are ...
Kafka Connect和Kafka Streams提交间隔之间有关系吗?
假设我们有一个Kafka Sink Connector从一个TopicA主题中填充数据库,一个Kafka Stream App推送更新到TopicA主题。我想知道如果。数据更新到...
当使用多个输入主题时,输入主题的消息速率不同,是否会影响kafka流的处理速度?
在我的kafka流应用中,我有10个输入主题(每个mysql表一个主题),我从这些主题中读取信息。某些topic的消息率很低,而其他topic的消息率稍高。很少有一...
kafka streams rocksdb jmx metrics
我正在尝试调试一个与kafka流有状态应用有关的性能问题。该应用程序查询了许多stattores(接近35个)。所以,为了找出慢的原因......
我想实现的是确保我的Kafka流消费者不会有滞后。我有一个简单的Kafka流应用,将一个主题以GlobalKTable的形式具体化为存储。当我...
kafka流中stattore和changelog主题的用途?
我有一个kafka流应用,它使用stateStore(由RocksDB支持)。所有的流线程正在做的是从kafka主题中获取数据,并把数据放到state-store中。(有 ...