apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

我可以设置Kafka Stream consumer group.id吗?

我正在使用Kafka Stream库进行流媒体应用。我想设置kafka消费者群组ID。然后,我把Kafka流配置如下。 streamsCopnfiguration.put(StreamsConfig ....

回答 1 投票 1

流处理器(低级API)源处理器如何从主题获取数据?

我是kafka流处理器的新手,并且遇到了“拓扑”的关键概念。我创建了源处理器,它从“源主题”中读取如下:拓扑拓扑=新拓扑(); ...

回答 2 投票 1

Spring Kafka - 如何在生成消息时获取时间戳(事件时间)

我需要在kafka使用者应用程序中获取生成消息时的时间戳(事件时间)。我知道timestampExtractor,可以与kafka流一起使用,但是......

回答 1 投票 0

如何在我的Kafka流应用程序中引用自定义分区分组器?

按主题名称而不是分区ID自定义分组任务。如何在我的Kafka流应用程序中引用我的自定义分区分组器类?谢谢

回答 2 投票 0

更正流数据中的聚合视图

此问题与KSQL或流处理技术的聚合视图有关。当我们接收事件时,我们正在应用group by子句来聚合它们。现在有一个事件......

回答 1 投票 0

Streams经常重建商店

在流应用程序中,我使用交互式查询和状态存储,以便扩展并能够更快地使用主题中的数据。但是我常常在日志中看到警告:异常-...

回答 1 投票 0

当消息大小很大时,Kafka主题滞后会逐渐增加

我正在使用Kafka Streams Processor API构建Kafka Streams应用程序以从Kafka主题检索消息。我有两个具有相同Kafka Streams配置的消费者应用程序。 ...

回答 1 投票 1

Kafka流聚合与删除和/或密钥更改

我正在尝试定义一个kafka流,它接受来自某个主题的记录,例如EMPLOYEE,其中记录包含有关员工及其部门的属性,并将其转换为另一个主题,...

回答 1 投票 0

KafkaStreams应用程序有java线程开销

使用KafkaStreams拓扑消耗并在一些处理步骤之后产生结果到另一个kafka主题并配置:num.stream.threads:10 JConsole说有20个消费者...

回答 2 投票 1

访问Kafka流的KTable底层RocksDB内存使用情况

我有一个kafka流应用程序,目前需要3个主题并将它们聚合为KTable。这个应用程序驻留在马拉松的scala微服务中,已经分配了512 MB的内存来使用。 ...

回答 1 投票 1

如何只使用Java lambda在Kafka Streams中窗口输入?

我输入了使用Kafka Stream获取的数据。我需要实现的只是一个5秒的翻滚窗口,并将数据输出到Kafka主题。但是,我无法做到这一点......

回答 1 投票 0

Kafka stream groupByKey不适用于count()

我正在尝试基于键生成计数,使用下面的代码,此代码基于单词计数示例。奇怪的是,如果mapValues函数返回String,那么groupBy就像...

回答 1 投票 0

是否可以在Spring Cloud Stream中拥有多个@StreamListener?

我使用Sprin云strema Kstream。我测试了一个主题和一个@StreamListner。没关系。我修改了两个KStream Input的代码。 (两个@StreamListener)但是,春云错误.. *********************** ...

回答 4 投票 1

对于时间序列汇总/聚合,流处理是否优于批处理?

背景 - 我在Cassandra中存储的Apache Kafka中提取了时间序列数据。由于原始数据需要大量存储空间,因此我尝试聚合数据并按小时,每天创建......

回答 1 投票 0

如何始终使用kafka-streams中的最新偏移量

我们的要求是,如果kafka-stream应用程序正在使用分区,它应该从该分区的最新偏移开始消耗。这似乎可以使用......

回答 1 投票 1

如何通过多个StreamListener Spring Cloud Stream和Kafka流来收听多个主题

我想听听下面代码中的两个Kafka主题,并且有两个源事件需要处理并转换为另一个事件。所以我想把这两个事件合二为一......

回答 1 投票 0

Kafka Streams重新分配的线程数

我有一个Kafka Streams应用程序,从一个Kafka主题中读取5个分区。然后多次聚合/重新分配数据。我试图找到数量的建议......

回答 1 投票 2

声明库包的类时出现IllegalAccessError

我正在为一个未记录的代码进行维护,并且在Spring Boot中运行类时偶然发现了一个奇怪的错误。代码在执行时引发IllegalAccessError,但是......

回答 1 投票 0

Apache Beam over Apache Kafka Stream处理

Apache Beam和Apache Kafka在流处理方面有什么区别?我也试图掌握技术和程序上的差异。请帮我理解......

回答 2 投票 1

Kafka Streams:如何获得SessionWindow的第一个和最后一个记录?

默认情况下,.windowedBy(SessionWindows.with(Duration.ofSeconds(60))返回每个传入记录的记录。结合.count()和.filter(),可以很容易地检索第一条记录。使用... 。

回答 1 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.