apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

如何在kstreams中生成主题的avro格式数据

KStream left = builder.stream(“source1”); left.toStream(“soure2”)想要在发送到source2之前序列化

回答 1 投票 0

使用Spring访问Kafka Stream State Store

引用此文档的4.2.6 https://docs.spring.io/spring-kafka/reference/htmlsingle/#kafka-streams如何使用kafka流弹簧支持访问州商店?没有春天,你......

回答 1 投票 1

为什么我偶尔会收到一个InvalidStateStoreException PARTITIONS_REVOKED,而不是在检索商店进行查询时运行?

我正在访问一个状态存储来查询它,并且必须用try / catch块包装store()语句来重试它,因为有时我得到这个异常:org.apache.kafka.streams.errors ....

回答 1 投票 1

为什么此KStream / KTable拓扑会传播未通过过滤器的记录?

我有以下拓扑:创建状态存储过滤基于SOME_CONDITION的记录,将其值映射到新实体,最后将这些记录发布到另一个主题...

回答 1 投票 2

从Kotlin调用通用Java varargs方法

我试图从Kotlin调用以下Java方法:KStream [] branch(final Predicate super K,?super V> ...谓词);此方法是Kafka Streams API的一部分。 ......

回答 2 投票 2

Kafka Streams - 来自保留政策主题的KTable

我正在尝试kafka流,我有以下设置:我有一个现有的kafka主题,其密钥空间是无限的(但可预测且众所周知)。我的话题有保留......

回答 1 投票 2

性能问题:Kafka Streams中有时会出现延迟峰值

我在Kafka Streaming中进行性能测试。我用Transformer创建了一个简单的Streams API。 //从输入主题builder.stream(Serdes.String(),Serdes.String(),inTopic)中流式传输数据...

回答 1 投票 0

计算给定窗口的流的统计信息

我有一个经常打勾的计时器KStream(想想秒数),我想在24小时的窗口内计算各种统计数据。例如,24小时更改,给定的价格差异...

回答 1 投票 1

从KSQL表中读取数据

也许这是一个初学者的问题,但是读取KSQL中生成的数据的推荐方法是什么?我们假设我做了一些流处理并将数据写入KSQL表。现在我想访问这个...

回答 1 投票 4

Kafka Stream Rebalancing:从REBALANCING到ERROR的状态转变

我有4个主题,包括单个分区和3个应用程序实例。我尝试通过编写自定义PartitionGrouper来实现可伸缩性,它将创建3个任务,如下所示:1st instance -...

回答 1 投票 1

Kafka Streams:使用相同的`application.id`来使用多个主题

我有一个需要收听多个不同主题的应用程序;每个主题都有关于如何处理消息的单独逻辑。我曾想过为每个人使用相同的kafka属性......

回答 1 投票 8

自动部署Kafka Stream应用

我刚刚开始使用Kafka和Kafka Streaming Applications。我写了一个Kafka Stream应用程序,它消耗一个主题,处理这些消息,并将它们发送到另一个主题。尽我所能......

回答 1 投票 4

Kafka-streams州说错误

流运行一段时间后会出现以下错误?我无法找到谁负责创建.sst文件?环境:Kafka版本0.10.0-cp1 scala 2.11.8 org.apache ....

回答 1 投票 1

Kafka代理请求队列峰值,导致Streams超时异常

我一直在监控4台机器的Kafka集群上的指标。我有一个输入应用程序写入消息到Kafka和几个Kafka Streams应用程序处理这些消息和...

回答 1 投票 2

如何将KStream打印到控制台?

我创建了一个Kafka主题并向其推送了一条消息。所以bin / kafka-console-consumer --bootstrap-server abc.xyz.com:9092-topic myTopic --from-beginning --property print.key = true --property key ....

回答 2 投票 2

max.poll.intervals.ms默认设置为int.Max

Apache Kafka文档说明:内部Kafka Streams使用者max.poll.interval.ms默认值已从300000更改为Integer.MAX_VALUE因为此值用于检测何时...

回答 1 投票 2

源和汇主题的输入记录时间戳和输出记录时间戳是否相同?

我使用Processor API创建kafka流应用程序。这是我如何创建一个主题,将时间戳附加到所有传入的消息kafka-topics.sh --create --zookeeper localhost:2181 --...

回答 1 投票 -1

Kafka KStreams中的'for'循环支持

我需要知道如何在我的kafka KStreams行中使用'for'循环...下面是我的'for'循环,需要包含在KStreams中(int i = 0; i <6; i ++){try { ...

回答 1 投票 0

KafkaStream createTopic不尊重Kafka服务器的auto.create.topics.enable设置

我们有一个生产Kafka集群,最近受到了一系列新主题的污染。 Kafka群集具有以下设置:auto.create.topics.enable = false delete.topic.enable = false ...

回答 1 投票 2

如何更改Kafka拓扑的消费者偏移量?

如今,我正在开发一个使用Kafka Streams处理消息的项目。我们的消息由两个标识符组成,其中一个用于用户标识符,第二个用于消息标识符...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.