用于与Apache Kafka使用者API相关的问题
我设置了(connection.backoff.ms,retry.backoff.ms)参数为5分钟,期望从kafka发送到断开连接的数据库后,重试时间为5分钟。但是重启后...
我在 Kafka 文档的任何地方都找不到消费者如何使用来自多个分区的数据,尤其是它如何决定从下一个分区中选择哪个分区以及是否进行此轮询
我写了一个Kafka消费者和生产者。 producer是transacted producer,consumer配置isolation.level为read_commited。 但最终,过了一段时间,消耗...
我正在使用 spring reactive kafka 创建消费者,但是当出现异常时重试逻辑不起作用。 @EventListener(ApplicationReadyEvnt.class) 公共无效消费(){
Error: Kafka target database批量录入错误(主键冲突为Null值)
Error: Batch entry 0 INSERT INTO "person" ("ID","NAME") VALUES (NULL,'Alex') ON CONFLICT ("ID") DO UPDATE SET "NAME"=EXCLUDED."NAME" was
错误:表 'person' 的 PK 模式是 RECORD_KEY,但缺少记录键模式
资料来源:甲骨文数据库 目标:Postgres 使用 Kafka 复制数据。 当我插入时,它工作正常。 当我更新源上的记录时,它会在目标上放置一个新条目而不是向上......
DophinDB Kafka 插件:无法从 Kafka 主题读取数据
我订阅了一个只有一个分区的Kafka主题。我想读取数据,但是失败了,是不是哪里出了问题? 我的脚本如下: 经纪商名单 = "....." groupId = "Test001&qu...
即使使用“auto.offset.reset”也不会收到旧消息:“最早”
我正在尝试使用 Confluent Kafka 在 python 中编写一个 kafka 消费者。我可以获得所有新消息,但如果我杀死并重新启动我的消费者,我不会收到任何旧消息 def confluent_kafka_consumer(app)...
Kafka 2.12 Consumer Group (with command prompt) - Second consumer can't seem to receive the message?
我从 Apache Kafka 2.12 开始,在使用命令提示符时练习消费者组时遇到了问题:当创建两个消费者并设置为订阅时...
Spring Cloud Stream功能手册确认 - KafkaHeaders.ACKNOWLEDGMENT不可用。
我使用Spring Cloud Webflux和Spring Cloud Streams Functional Interfaces来处理我的kafka处理。如果我按顺序进行处理,并且如果我杀死应用程序,它就会返回 ...
在我们的kafka消费者中使用spring kafka。根据我的业务需求,我需要在处理同一批记录时,再次轮询回同一批记录。根据https:/...
我使用KafkaTemplate从Spring boot.Java 8我的主要目的是,消费者不应该消耗的消息两次。1) 调用一个表获取100行并发送给Kafka 2) 假设我在处理...
我需要完全阅读话题1的消息,然后再阅读话题2的消息。我每天都会在这些主题中收到一次消息,我需要完全阅读主题1中的消息,然后再阅读主题2中的消息。我设法停止读取topic2的消息,然后再...
我想实现的是确保我的Kafka流消费者不会有滞后。我有一个简单的Kafka流应用,将一个主题以GlobalKTable的形式具体化为存储。当我...
Kafka Version 2.4.1 Zookeeper 3.4.10 我在删除kafka streams应用程序创建的消费组时遇到了麻烦。即使应用程序已关闭,但该组总是报告为'...'。
有什么方法可以在Java API中从一个特定的偏移量开始消耗kafka主题?
我使用的是Kafka流API。当我启动我的应用程序时,有时会有一个间隙,我想从一个特定的偏移开始消费。StreamProperties.put(...)。
KafkaConsumer.commitSync()到底能提交什么?
KafkaConsumer.commitSync是否像JavaDoc所说的那样,只是提交 "最后一次poll()返回的偏移量",(可能会漏掉一些没有包含在最后一次poll结果中的分区),或者它实际上是在提交 ...
我们有一个Kafka Streams应用程序正在经历奇怪的行为。当作业被杀死并重新启动时,消费者组随机地将其偏移量重置为最早的,所有的旧 ...
发送Kafka消息的更好方式:JsonNode或Map<String,UserModel>?
我是新的Kafka与Spring Boot。我们可以通过Json String Streams JsonNode Map来发送Kafka消息。因此,现在在我的项目中,我想发送Map到我的一个消费者。所以作为一个...
我的应用程序正在从kafka读取一个主题,丰富后保存到另一个主题。StreamsConfig.NUM_STREAM_THREADS_CONFIG配置为8。我有两个broker和12个分区。...