apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

有什么方法可以在Java API中从一个特定的偏移量开始消耗kafka主题?

我使用的是Kafka流API。当我启动我的应用程序时,有时会有一个间隙,我想从一个特定的偏移开始消费。StreamProperties.put(...)。

回答 1 投票 0

Kafka流--应用JVM随机崩溃。

我有一个Kafka Streams应用程序在多个JVM上运行(共5个)以提高吞吐量。运行了一个小时左右还算正常,然后,除了最后一个JVM,每个JVM都开始陆续崩溃。...

回答 1 投票 0

如何在KafkaStreams中为你的应用在多个实例中生成唯一的sequenceId?

我想为我们从主题中消费的东西生成唯一的有序Id,而且它应该在多个实例周围是唯一的。(不是uuid)

回答 1 投票 0

Kafka Streams Aggregator中的访问记录偏移量

我有一个简单的窗口化拓扑: builder.stream("input-topic", Consumed.with(...)) .groupByKey() .windowedBy(TimeWindows.of(windowSize)).advanceBy(windowAdvance).grace(windowGrace)).......。

回答 1 投票 0

Kafka Streams - 为每条记录提取对象列表的时间戳。

我想实现的是根据消息中存在的时间戳来获取记录中存在的每条消息的计数。每条记录由列表组成 对象。我想提取...

回答 1 投票 0

从Spring Cloud Streams Kafka Stream应用中的处理器写入主题

我是用处理器API来做一些低级处理到一个状态存储中。关键是我还需要在存储到存储后写进一个主题。在Spring Cloud中怎么能做到 ...

回答 1 投票 0

kafka流应用的行为,当应用没有进行偏移提交时(自动提交关闭)。

我想知道当(使用低级API)自动提交关闭,并且应用程序不做显式提交时,kafka流的行为是什么?如果应用程序重新启动(自动提交 ...

回答 2 投票 0

序列器与实际的键类型不兼容(键类型:java.lang.String)。

我试图用GlobaKTable加入流,得到了序列化问题。加入KStream的产品流 与productTable是GlobalKTable (这里...

回答 1 投票 0

Kafka流应用消费者偏移量在重启时被重置为最早的。

我们有一个Kafka Streams应用程序正在经历奇怪的行为。当作业被杀死并重新启动时,消费者组随机地将其偏移量重置为最早的,所有的旧 ...

回答 1 投票 0

卡夫卡流跳窗顶N按维度划分

我有一个kafka流,我需要一个处理器,它可以做到以下几点。使用45秒的跳转窗口和5秒的进阶来计算前5名的数量 基于域对象的一个维度。...

回答 1 投票 1

Kafka流固定窗口不按键分组。

我得到一个单一的Kafka流。我如何在特定的时间窗口内积累消息,而不考虑密钥?我的用例是在不考虑密钥的情况下,每10分钟从一个流中写入一个文件。

回答 1 投票 0

Kafka有状态流处理器与stattore。幕后花絮

我想了解有状态流处理器。据我所知,在这种类型的流处理器中,它使用State Store来维护某种状态。我知道,其中一种方法是将状态存储...

回答 1 投票 0

如何扩展运行在kubernetes上的Kafka Streams应用?

我有一个kafka streams App,它运行在Kube集群的一个pod上。试图找到一种方法,让Kube基于kafka话题滞后来扩展updown pods。有没有人走过的路径,并愿意... ...

回答 1 投票 0

Kafka Streams与RocksDB的内存分配问题

我试图做一个简单的Kafka流应用(v2.3.0),收集指定时间间隔的统计数据(例如每分钟即翻滚的窗口)。因此我按照教科书...

回答 1 投票 1

当使用 enable.idempotence true 时,Spring Cloud Stream Kafka 应用程序的启动速度极慢。

我的Scs应用有两个Kafka生产者,配置如下:spring: cloud: function: definition: myProducer1;myProducer2 stream: bindings: myproducer1-out-0: ...

回答 1 投票 4

Kafka流在窗口前访问最新的值。

我有一个带窗口的流,它需要根据窗口内发生的所有值加上窗口前发生的最新值来计算一个值。 输入流.groupByKey....

回答 1 投票 0

我如何创建一个既是kafka消费者又是kafka生产者的组件?

我试图创建一个组件,从一个主题消耗数据,处理数据并发送到另一个主题,也就是说,我需要让我的组件既是消费者又是生产者。我如何配置这个...

回答 1 投票 0

待机任务不向.检查点文件写入更新。

我有一个Kafka Streams应用,配置为每个任务创建一个备用副本。我有两个应用程序的实例在运行。当应用程序启动时,应用程序...

回答 1 投票 1

Kafka Streams standy by replicas,num.standby.replicas。

假设我们有4个Kafka流实例执行有状态的操作,如果num.standby.replicas=4,这个配置是不是等于使用GlobalKTable而不是KTable?

回答 1 投票 0

序列化异常。反序列化Avro消息错误(StringIndexOutOfBoundsException)

当KafkaStream试图反序列化Arvo消息时,我遇到了这个错误。[filtering-app-6adef284-11eb-48f8-8ca0-cde7da5224ab-StreamThread-1] ERROR org.apache.kafka.streams.KafkaStreams - ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.