与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
对于翻滚窗口,emitEarlywhenFull()的抑制如何工作?
我在翻滚窗口上使用抑制来获得聚合结果。我正在探索untilTimeLimit和untilWindowCloses进行压制。当缓冲区满时,我不希望我的流关闭。我有 ...
Spring Cloud Stream会产生不必要的复杂Kafka拓扑,为什么?
我有一个KStream应用程序,包含一堆KStream,连接和其他操作。我启用了logging.level.org.springframework.kafka.config = debug以验证正在生成的拓扑并且...
Kafka用例:从kafka连续读取,对消息执行解密,然后插入到db
Kafka用例:从Kafka(可能是一个流)连续读取,使用Java方法对消息值执行解密,然后插入到db(或者接收器或者我有一个Java REST API,可以...
我们正在使用Ktabke在kafka中进行聚合,它是非常基本的用法,并且已经从kafka doc中引用。我只是试图调查一下,如果一些消息消费失败,同时聚合我们如何...
Kafka Streams恰好一次TransactionalIdAuthorizationException
在将PROCESSING_GUARANTEE_CONFIG设置为exact_once之后,我的kafka流应用程序表现良好,我的复制因子为3。但是我有一个开发...
Kafka流使用哪种分区策略?我们可以更改Kafka Stream中的分区策略,因为我们可以在普通的Kafka Consumer streamsConfiguration.put(ConsumerConfig ....
我正在开发卡夫卡流窗口,尤其是用于我的用例的翻滚窗口。 TimeWindowedKStream windowedStreams = groupedStreams .windowedBy(...
我们使用Spring云流与Kafka 2.0.1并利用InteractiveQueryService从商店获取数据。聚合数据后,有4个商店在磁盘上保留数据。代码 ...
Kafka Streams中的状态过滤/ flatMapValues?
我正在尝试编写一个简单的Kafka Streams应用程序(针对Kafka 2.2 / Confluent 5.2),将具有至少一次语义的输入主题转换为精确一次的输出流。我想 ...
我正在尝试测试一个拓扑,作为最后一个节点,它具有KTable。我的测试是使用完整的Kafka Cluster(通过汇合的Docker镜像),所以我没有使用TopologyTestDriver。我......
重启我的Kafka Streams应用程序时出现OutOfMemoryError
我有一个Kafka Streams应用程序(Kafka Streams 2.1 + Kafka broker 2.0),它根据TimeWindows进行聚合,我使用suppress运算符来抑制结果的输出。一切都运作良好......
我正在使用kafka流进行某些应用程序。流流如下kafkaProducer ----> StreamerConsumer1-> finalCosumer我有生产者,它可以非常快速地写入数据和我的StreamConsumer ......
Spring Cloud Stream Kafka Stream在加入后不写入目标主题
这是我的应用程序,它只是从客户主题(输入绑定)获取对KStream的引用,而从订单主题(订单绑定)获取另一个。然后它从客户主题创建一个KTable ...
我想ReKey一个GlobalKTable(可能在初始化它时,因为我相信它们只在创建时才被读取)。这可能吗?我在Spring / Java Kafka中有两个主题...
如何将从一个主题创建的流加入到来自其他主题的KTable派生(作为聚合操作)
问题:如何将从TOPIC_2(在步骤2中)创建的流加入到KTable stateTable(在格式的步骤1中)。目标:如果我们改变AlarmState的状态(KTable的值...)之后的连接操作
我有两个主题,一个有3个分区,一个有48个。最初我使用了默认的赋值器但是当消费者(kubernetes中的pod)崩溃时我遇到了一些问题。发生了什么事,当......
我正在使用Kafka Streams,我注意到它使我的kafka记录了很多日志消息,例如:[2019-04-17 09:06:16,541] INFO [Log partition = my-application-KSTREAM-AGGREGATE -state-STORE -...
我对流媒体提出了一个问题但是对于问题的范围,让我们用Kafka Streams限制自己。让我们通过将问题限制为单词计数来进一步缩小范围,......
我需要使用Kafka Sreams API和Processor API。我还想在处理器实现中将不同类型的对象写入不同的主题,即在进程中发出不同的对象&...
我们有一个Kafka制作人,可以以非常高的频率为保留时间= 10小时的主题生成键控消息。这些消息是实时更新,使用的密钥是元素的ID ...