apache-kafka-streams 相关问题

与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。

Kafka流处理器什么时候提交?不记录日志我们能做到容错吗?

假设我有一个简单的处理器,可以输出每个键的最新记录,使用不一定与 Kafka 时间戳相同的内部版本(因此不可能压缩日志)。

回答 1 投票 0

KafkaStreams 指标,每个流处理的记录/提取拓扑中每个流的偏移量

我正在使用 org.apache.kafka.streams.KafkaStreams,例如我的拓扑如下所示: StreamsBuilder 构建器 = new StreamsBuilder(); builder.stream("输入主题1") .

回答 1 投票 0

kafka 流是从主题分组消息并进一步发送的良好解决方案吗

我正在开发 Kafka Streams 应用程序,我想对按键分组的主题中的数据进行批处理,并进一步发送分组的数据。我不完全确定 Kafka Streams 是否是一个很好的解决方案

回答 1 投票 0

Kafka 流如何在任务内执行并行操作?

假设我们有一个 kafka-streams 拓扑,例如: 输入主题 | | \ / Kstream-源 ...

回答 1 投票 0

如何调试kafkastreams代码?

有人设法使用 IntelliJ IDEA 调试用 Java 8 编写的 kafkastreams 代码吗?我正在运行一个简单的linesplit.java代码,它从一个主题获取流并将其分割并将其发送到...

回答 3 投票 0

Kafka Streams:获取时间窗口中的事件计数

我有数据流作为事件。我想获取 10 分钟时间窗口内的事件计数并输出到另一个主题。以下是我的代码 StreamsBuilder StreamsBuilder = 新

回答 2 投票 0

创建 Kafka Stream 以根据值计算金额

我正在生成如下数据: 键:“迈克”,值:{“金额”:46,“时间”:“2021-11-05T07:53:32.005751Z”} 键:“约翰”,值:{“金额”:46,“时间&

回答 3 投票 0

使用 Kafka Connect 生成整数 (INT32) 键

我不知道我是否在这里白费力气,并试图做一些完全毫无意义的事情,但我想我还是会问,因为我已经花了足够长的时间把头撞到一个砖瓦...

回答 1 投票 0

KafkaStreams 一开始不读取来自 kafka 的消息

@Bean 公共 KafkaStreams kafkaStreams(KafkaStreamsConfiguration StreamsConfig) { 属性 props = new Properties(); props.put("bootstrap.servers", "localhost:9...

回答 1 投票 0

我可以在 Kafka Streams 拓扑和普通 Kafka 消费者之间使用相同的组 ID 吗?

我的服务是一个 Kafka Streams 应用程序,假设应用程序 ID 设置为“service1”,从主题 A 消费。我相信这个 ID 成为 Kafka Stre 中消费者的组 ID...

回答 1 投票 0

物化状态存储目录中缺少数据

我的应用程序在 GroupedKTable 上进行聚合,然后将其具体化为 PersistentKeyValueStore 我将 state.dir 配置为永久路径(不是默认的 /tmp/kstreams) 我可以看到钥匙-

回答 1 投票 0

Kstream-Ktable 与 CloudEvent 值的连接不起作用

我想在kafka流和ktable之间进行连接。 poc 可以很好地处理流数据。但是,当我使用 CloudEvent 时,我不断遇到与序列化相关的一些或其他问题。 这里...

回答 1 投票 0

无效拓扑:拓扑没有流线程,也没有全局线程,必须订阅至少一个源主题或全局表

启动 Spring Boot 应用程序时出现错误。 org.springframework.context.ApplicationContextException:无法启动bean“defaultKafkaStreamsBuilder”;嵌套异常是 org。

回答 1 投票 0

如何操作KafkaStreams KeyValueStore

我正在使用 KafkaStreams 来处理消息的 kafka 并将累积状态保存在 KeyValueStore 中。 但我还需要通过 API 调用以编程方式影响 KeyValueStore 的状态。 这...

回答 1 投票 0

使用 Kafka Streams 开发时,Lib 上的 UnsatisfiedLinkError 会影响 DB dll

我正在我的开发 Windows 机器上编写 Kafka Streams 应用程序。 如果我尝试使用 Kafka Streams 的 leftJoin 和分支功能,我在执行 jar 应用程序时会收到以下错误:

回答 7 投票 0

将两个kafka主题中的消息排序到另一个主题中

假设我有 2 个 Kafka 主题,每个主题都有一个分区。 我想使用 Kafka Streams、Apache Flink 或 Spring Kafka 等技术通过连接这两个主题将数据写入第三个主题

回答 1 投票 0

流式数据过滤算法

我正在使用 Kafka Streams 开发基于 Kafka 的流数据应用程序。 在这里,我的应用程序将在高峰时段(大约 10 小时)处理大约 400M 的事件,在这里我需要过滤这些事件

回答 1 投票 0

Kafka 状态存储作为本地持久化选项?

我有一个简单的分布式系统架构,其中一个生产者系统将事件写入一个 kafka 主题。这些事件基本上只被一个系统消耗。这位消费者整晚都在加载...

回答 1 投票 0

RocksDB 数据丢失

我有一个禁用日志记录的有状态 Kafka Streams 应用程序。我必须禁用它的原因是因为某些记录的聚合状态可能会变得非常大,并向cha发送更新...

回答 1 投票 0

KafkaStream 在代理滚动升级时达到 ERROR 状态

我正在使用 Kafka 2.8.1 (AWS MSK)。我观察到,每当 AWS 完成一些滚动升级时,我的 Kafka 流应用程序都会达到错误状态,并且在尝试时不断收到以下异常...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.