与Apache Kafka的内置流处理引擎相关,称为Kafka Streams,它是一个用于使用Apache Kafka构建分布式流处理应用程序的Java库。
Kafka流处理器什么时候提交?不记录日志我们能做到容错吗?
假设我有一个简单的处理器,可以输出每个键的最新记录,使用不一定与 Kafka 时间戳相同的内部版本(因此不可能压缩日志)。
KafkaStreams 指标,每个流处理的记录/提取拓扑中每个流的偏移量
我正在使用 org.apache.kafka.streams.KafkaStreams,例如我的拓扑如下所示: StreamsBuilder 构建器 = new StreamsBuilder(); builder.stream("输入主题1") .
我正在开发 Kafka Streams 应用程序,我想对按键分组的主题中的数据进行批处理,并进一步发送分组的数据。我不完全确定 Kafka Streams 是否是一个很好的解决方案
假设我们有一个 kafka-streams 拓扑,例如: 输入主题 | | \ / Kstream-源 ...
有人设法使用 IntelliJ IDEA 调试用 Java 8 编写的 kafkastreams 代码吗?我正在运行一个简单的linesplit.java代码,它从一个主题获取流并将其分割并将其发送到...
我有数据流作为事件。我想获取 10 分钟时间窗口内的事件计数并输出到另一个主题。以下是我的代码 StreamsBuilder StreamsBuilder = 新
我正在生成如下数据: 键:“迈克”,值:{“金额”:46,“时间”:“2021-11-05T07:53:32.005751Z”} 键:“约翰”,值:{“金额”:46,“时间&
使用 Kafka Connect 生成整数 (INT32) 键
我不知道我是否在这里白费力气,并试图做一些完全毫无意义的事情,但我想我还是会问,因为我已经花了足够长的时间把头撞到一个砖瓦...
KafkaStreams 一开始不读取来自 kafka 的消息
@Bean 公共 KafkaStreams kafkaStreams(KafkaStreamsConfiguration StreamsConfig) { 属性 props = new Properties(); props.put("bootstrap.servers", "localhost:9...
我可以在 Kafka Streams 拓扑和普通 Kafka 消费者之间使用相同的组 ID 吗?
我的服务是一个 Kafka Streams 应用程序,假设应用程序 ID 设置为“service1”,从主题 A 消费。我相信这个 ID 成为 Kafka Stre 中消费者的组 ID...
我的应用程序在 GroupedKTable 上进行聚合,然后将其具体化为 PersistentKeyValueStore 我将 state.dir 配置为永久路径(不是默认的 /tmp/kstreams) 我可以看到钥匙-
Kstream-Ktable 与 CloudEvent 值的连接不起作用
我想在kafka流和ktable之间进行连接。 poc 可以很好地处理流数据。但是,当我使用 CloudEvent 时,我不断遇到与序列化相关的一些或其他问题。 这里...
无效拓扑:拓扑没有流线程,也没有全局线程,必须订阅至少一个源主题或全局表
启动 Spring Boot 应用程序时出现错误。 org.springframework.context.ApplicationContextException:无法启动bean“defaultKafkaStreamsBuilder”;嵌套异常是 org。
如何操作KafkaStreams KeyValueStore
我正在使用 KafkaStreams 来处理消息的 kafka 并将累积状态保存在 KeyValueStore 中。 但我还需要通过 API 调用以编程方式影响 KeyValueStore 的状态。 这...
使用 Kafka Streams 开发时,Lib 上的 UnsatisfiedLinkError 会影响 DB dll
我正在我的开发 Windows 机器上编写 Kafka Streams 应用程序。 如果我尝试使用 Kafka Streams 的 leftJoin 和分支功能,我在执行 jar 应用程序时会收到以下错误:
假设我有 2 个 Kafka 主题,每个主题都有一个分区。 我想使用 Kafka Streams、Apache Flink 或 Spring Kafka 等技术通过连接这两个主题将数据写入第三个主题
我正在使用 Kafka Streams 开发基于 Kafka 的流数据应用程序。 在这里,我的应用程序将在高峰时段(大约 10 小时)处理大约 400M 的事件,在这里我需要过滤这些事件
我有一个简单的分布式系统架构,其中一个生产者系统将事件写入一个 kafka 主题。这些事件基本上只被一个系统消耗。这位消费者整晚都在加载...
我有一个禁用日志记录的有状态 Kafka Streams 应用程序。我必须禁用它的原因是因为某些记录的聚合状态可能会变得非常大,并向cha发送更新...
KafkaStream 在代理滚动升级时达到 ERROR 状态
我正在使用 Kafka 2.8.1 (AWS MSK)。我观察到,每当 AWS 完成一些滚动升级时,我的 Kafka 流应用程序都会达到错误状态,并且在尝试时不断收到以下异常...