apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

对于翻滚窗口,emitEarlywhenFull()的抑制如何工作?

我在翻滚窗口上使用抑制来获得聚合结果。我正在探索untilTimeLimit和untilWindowCloses进行压制。当缓冲区满时,我不希望我的流关闭。我有 ...

回答 1 投票 0

Spring Cloud Stream会产生不必要的复杂Kafka拓扑,为什么?

我有一个KStream应用程序,包含一堆KStream,连接和其他操作。我启用了logging.level.org.springframework.kafka.config = debug以验证正在生成的拓扑并且...

回答 1 投票 0

Kafka用例:从kafka连续读取,对消息执行解密,然后插入到db

Kafka用例:从Kafka(可能是一个流)连续读取,使用Java方法对消息值执行解密,然后插入到db(或者接收器或者我有一个Java REST API,可以...

回答 1 投票 0

Kafka ktable腐败的消息处理

我们正在使用Ktabke在kafka中进行聚合,它是非常基本的用法,并且已经从kafka doc中引用。我只是试图调查一下,如果一些消息消费失败,同时聚合我们如何...

回答 1 投票 0

Kafka Streams恰好一次TransactionalIdAuthorizationException

在将PROCESSING_GUARANTEE_CONFIG设置为exact_once之后,我的kafka流应用程序表现良好,我的复制因子为3。但是我有一个开发...

回答 1 投票 0

卡夫卡流的分区策略

Kafka流使用哪种分区策略?我们可以更改Kafka Stream中的分区策略,因为我们可以在普通的Kafka Consumer streamsConfiguration.put(ConsumerConfig ....

回答 1 投票 0

如何在使用翻滚窗口时选择时间语义的类型?

我正在开发卡夫卡流窗口,尤其是用于我的用例的翻滚窗口。 TimeWindowedKStream windowedStreams = groupedStreams .windowedBy(...

回答 1 投票 1

使用状态存储的流应用程序最多需要1小时才能重新启动

我们使用Spring云流与Kafka 2.0.1并利用InteractiveQueryService从商店获取数据。聚合数据后,有4个商店在磁盘上保留数据。代码 ...

回答 1 投票 1

Kafka Streams中的状态过滤/ flatMapValues?

我正在尝试编写一个简单的Kafka Streams应用程序(针对Kafka 2.2 / Confluent 5.2),将具有至少一次语义的输入主题转换为精确一次的输出流。我想 ...

回答 1 投票 0

应该由KTable发出的事件

我正在尝试测试一个拓扑,作为最后一个节点,它具有KTable。我的测试是使用完整的Kafka Cluster(通过汇合的Docker镜像),所以我没有使用TopologyTestDriver。我......

回答 1 投票 1

重启我的Kafka Streams应用程序时出现OutOfMemoryError

我有一个Kafka Streams应用程序(Kafka Streams 2.1 + Kafka broker 2.0),它根据TimeWindows进行聚合,我使用suppress运算符来抑制结果的输出。一切都运作良好......

回答 1 投票 0

Kafka通过多个分区和多个消费者线程来增加吞吐量

我正在使用kafka流进行某些应用程序。流流如下kafkaProducer ----> StreamerConsumer1-> finalCosumer我有生产者,它可以非常快速地写入数据和我的StreamConsumer ......

回答 1 投票 0

Spring Cloud Stream Kafka Stream在加入后不写入目标主题

这是我的应用程序,它只是从客户主题(输入绑定)获取对KStream的引用,而从订单主题(订单绑定)获取另一个。然后它从客户主题创建一个KTable ...

回答 1 投票 0

是否可以重新使用GlobalKTable?

我想ReKey一个GlobalKTable(可能在初始化它时,因为我相信它们只在创建时才被读取)。这可能吗?我在Spring / Java Kafka中有两个主题...

回答 1 投票 4

如何将从一个主题创建的流加入到来自其他主题的KTable派生(作为聚合操作)

问题:如何将从TOPIC_2(在步骤2中)创建的流加入到KTable stateTable(在格式的步骤1中)。目标:如果我们改变AlarmState的状态(KTable的值...)之后的连接操作

回答 1 投票 0

了解kafka流分区分配器

我有两个主题,一个有3个分区,一个有48个。最初我使用了默认的赋值器但是当消费者(kubernetes中的pod)崩溃时我遇到了一些问题。发生了什么事,当......

回答 1 投票 2

Kafka Streams - 洪水kafka日志

我正在使用Kafka Streams,我注意到它使我的kafka记录了很多日志消息,例如:[2019-04-17 09:06:16,541] INFO [Log partition = my-application-KSTREAM-AGGREGATE -state-STORE -...

回答 1 投票 0

用Kafka Streams计数

我对流媒体提出了一个问题但是对于问题的范围,让我们用Kafka Streams限制自己。让我们通过将问题限制为单词计数来进一步缩小范围,......

回答 2 投票 1

使用Kafka Stream DSL写入处理器内的主题

我需要使用Kafka Sreams API和Processor API。我还想在处理器实现中将不同类型的对象写入不同的主题,即在进程中发出不同的对象&...

回答 1 投票 0

从消费者开始的主题中获取最新值,然后正常继续

我们有一个Kafka制作人,可以以非常高的频率为保留时间= 10小时的主题生成键控消息。这些消息是实时更新,使用的密钥是元素的ID ...

回答 1 投票 2

© www.soinside.com 2019 - 2024. All rights reserved.