与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
我可以在Kafka Streams中使用处理器api并从一个kafka集群中的主题读取数据并将数据管道传输到另一个kafka集群中的kafka主题吗?使用处理器api是可行的吗?我知道, ...
如何在Azure Cloud上加入Kafka KStream和纯文件缓存?
我正在开发一个日志丰富的Kafka Stream工作。计划是使用Azure Blob上的文件缓存来丰富Kafka KStream的日志条目。我的理解是我必须加载缓存...
什么时候Kafka Streams de / serialize
在什么情况下Kafka Streams程序需要序列化/反序列化?假设我们有以下简单程序:KStream stream = ...; Kstream stream2 = ...
要用Java创建kafka流状态存储,我可以这样做:最终的KGroupedStream wordCounts = textLines .flatMapValues(value - > Arrays.asList(pattern.split(value ....
我正在为书库编写应用程序,我有2个微服务:memberService和bookService。 memberService创建一个ktable成员(通过聚合另一个主题的更改消息构建)...
Kafka Streams主题中的多个对象和deserizalization
在我的kafka流应用程序中,我使用一个主题用于多个对象类型,seriazliazed为JSON。我使用类名作为键,我的想法是消费者只会过滤传入条目的子集...
嵌入式Kafka迁移状态存储在具有不同@StreamListener的两个实例之间
我有一个SpringBoot应用程序,它有两个通过Spring Cloud映射的流处理器。每个处理器都有自己的@StreamListener用于不同的主题。一个处理器将聚合数据写入可查询...
我正在使用kafka 0.11。我在kafka-streams-0.11.0.0.jar中找不到StoreBuilder类。知道这是什么问题吗?
Apache Kafka Streams和事件采购,CQRS和验证
我们有几个遗留应用程序,主要由GUI +服务层+ RDMS组成。随着时间的推移,添加了一些批处理以在不同数据库之间同步/传输数据,依此类推。通常 ...
我们有一个流拓扑,可以在多台机器上运行。我们将时间窗口聚合结果存储到状态存储中。由于状态存储正在存储本地数据,因此聚合应该是......
我们正试图从我们的经典架构J2EE应用服务器/关系数据库转移到Kafka。我有一个用例,我不确定如何继续......我们的应用......
我注意到Kafka记录有一个CRC字段。如果日志文件中的记录损坏(例如,消息中间的一个位被翻转),我期望在流应用程序中看到...
我使用Kafka流DSL和地图来转换KStream 到KStream 。在ValueMapper函数中,我简单地返回新的ValueMapper(“key”,“some constant string”),...
如何映射输入KStream 成 使用KeyValueMapper?
接收我想在CarClass上映射的Json数据并想要创建新的流但是map方法不允许我在自定义数据类型上映射类型中的方法映射(KeyValueMapper>)...
我编写了一个流应用程序来与5个分区的5个代理的集群上的主题交谈。我在这里尝试了多种组合,例如10个应用程序实例(在10台不同的机器上)...
使用带有更改日志的RocksDb状态存储时,Kafka Stream提供哪些保证?
我正在构建一个Kafka Streams应用程序,它通过将每个新计算对象与最后一个已知对象进行比较来生成更改事件。因此,对于输入主题上的每条消息,我都会更新一个对象......
有没有办法使用千分尺在telegraf中获取kafka流和骆驼指标
我在我的应用程序中有Kafka流和骆驼,我想从中获取一些指标并通过telegraf将其发送到涌入。对于我的应用程序中的所有其他指标,我们使用千分尺。是......
Kafka-streams:设置要删除的内部主题清理策略不起作用
我使用kafka流减少功能,它创建一些状态存储更改日志kafka内部主题(如app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog)。我想设置保留字节和......
Kafka Stream with Avro in JAVA,schema.registry.url“没有默认值
我有我的Kafka Stream应用程序的以下配置属性config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId); config.put(...
我有两个流:Stream1:[KSTREAM-MAP-0000000004]:1,{“id”:1,“name”:“john”,“age”:26} [KSTREAM-MAP-0000000004]:2,{“ id“:2,”name“:”jane“,”age“:24} [KSTREAM-MAP-0000000004]:3,{”id“:3,...