apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

如何替换/转换Kafka消费者群体?

我有一个包含多个Kafka流的存储库。我想将其中一个流提取到其自己的存储库中。但是,我不确定如何处理该流的使用者组。这就是我的意思:...

回答 1 投票 1

带抑制的Kafka SessionWindow仅在有稳定的输入记录流时才发送最终事件

如果没有连续的输入记录流,则似乎带有会话窗口的宽频流的Kafka流似乎无法输出最终事件。上下文:我们正在使用...

回答 1 投票 1

如何在Kafka字数统计程序中修改KStream键和值?

我是Kafka Streams的新手,有点陷入基本的字数统计程序中。在下面的程序中,我尝试更改value的大小写,但不起作用(val wordCountInputProcessed = ...

回答 1 投票 0

KafkaStreams中的INCOMPLETE_SOURCE_TOPIC_METADATA错误

我正在编写KafkaStreams应用程序,并将maximum.num.threads设置为1。我有三个源主题,分别有6,8,8个分区。当前使用4个实例运行此流拓扑,因此4 ...

回答 1 投票 1

Kafka在春季流式传输,仅对一次ACL造成麻烦:TransactionalIdAuthorizationException

我无法从springboot kafka-streams应用程序连接到Confluent云上的主题,连接失败并出现一条错误行org.apache.kafka.common.errors...。]

回答 1 投票 0

如何修改KStream键和值?

我是Kafka Streams的新手,有点陷入基本的字数统计程序中。在下面的程序中,我试图更改value的大小写,但它不起作用(wordCountInput.mapValues(value => value ....

回答 1 投票 1

Kafka流抑制不返回任何值

我正在努力尝试一个相对简单的窗口字数示例。我正在尝试仅获取窗口结果,但什么也收不到。 KStream sl = s ... ....

回答 1 投票 1

如何更改@KafkaStreamsStateStore kafka流云的默认Serdes

如何更改@KafkaStreamsStateStore的默认Serdes?我知道在Kafka流云的新版本3.0.1中,此处说明了这种方式:https://cloud.spring.io/spring-cloud-static/spring -...

回答 1 投票 0

将数据存储到GlobalKTable

我无法理解如何将数据保存到globalKTable。我尝试使用GlobalKTable在Java中创建GlobalKTable globalTable = builder.globalTable(inputTopic,Materialized。> as(“ global -...

回答 2 投票 1

StateStore永远不会添加到Spring云中

任何帮助我如何在Spring云上添加状态存储,我总是收到此错误“嵌套异常是org.springframework.kafka.KafkaException:无法启动流:;嵌套异常是org.apache...。

回答 2 投票 0

如何在多个磁盘上分发Kafka-Streams状态存储

在Kafka Broker上,建议对消息日志使用多个驱动器,以提高吞吐量。这就是为什么它们具有log.dirs属性,该属性可以具有将被分配的多个目录的原因...

回答 1 投票 0

在Kafka Streams中创建全局状态存储(春季)

我是Kafka的新手,并试图创建一个小的Kafka KTable实现。我已经成功添加了KTable并能够查询。我已经使用了本地状态存储,并且按预期方式工作。下面...

回答 1 投票 0

从Kafka获取具有特定键的所有事件

我的系统中有一些主题,可以存储给定实体的事件。现在,我想对事件日志进行一些分析。因此,我需要查询属于...

回答 2 投票 0

删除Kafka StateStore中的记录不起作用(在.delete(key)上抛出NullPointerException)

我的代码中有一个实例化的内存状态存储。我还有另一个单独的流,应该根据某些条件查找和删除记录。我需要允许我的信息流访问,然后...

回答 1 投票 0

带有窗口的Kafka Streams拓扑不会触发状态更改

我正在构建以下Kafka Streams拓扑(伪代码):gK = builder.stream()。gropuByKey(); g1 = gK.windowedBy(TimeWindows.of(“ PT1H”))。reduce()。mapValues()。toStream()。mapValues()。selectKey()...

回答 1 投票 3

如何在运行有多个实例的Kafka Streams上进行主题级别的排序/计数

我是Kafka Streams的新手,正在寻找一种方法来订购跨分区的流数据。我的销售数据主题有10个分区,并根据所售商品进行分区。例如,...

回答 1 投票 0

检查StateStore是否已完全填充

我有一个紧凑的主题,约有30个Mio键。我的应用程序将此主题具体化为KeyValueStore。如何检查KeyValueStore是否已完全填充?如果我通过InteractiveQuery查询键...

回答 1 投票 0

我们可以在全局状态存储恢复期间调用处理器吗?

我阅读了Kafka流的用例,以讨论此stackover流中添加全局存储的问题,并了解到恢复期间全局状态存储会跳过处理器。有没有办法强制全局状态...

回答 1 投票 1

全局状态存储区不创建更改日志主题,如果全局存储区的输入主题具有空键,解决方法是什么?

我了解了很多有关全局状态存储的知识,它不会为还原创建更改主题,而是将源主题用作还原。我正在创建自定义密钥并将数据存储在全局状态存储中,...

回答 1 投票 0

[Kafka流Ktable Ktable连接在输出主题中不产生结果

我正在尝试一个非常简单的Ktable来加入Scala中的Ktable。这两个主题都有一个分区,但我仍然看不到“输出主题”对象的任何东西SimpleMerge扩展了App {val ...

回答 1 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.