用于与Apache Kafka生产者API相关的问题。有关制作Kafka主题的任何问题。生产者失败和恢复,幂等性和事务性API。
我希望避免向 Kafka 主题发送重复的消息。 实现它的理想方式是什么? 使用 Apache Kafka 的 Java 客户端,是否可以在调用之前验证消息是否存在
我们可以在Spring Boot中使用多个kafka模板吗?
在我的 spring boot kafka 发布者应用程序中,我想提供对以 String(json) 或字节格式发布消息的支持,因为我想同时提供对 json 和 avro 的支持。但是
KafkaJSProtocolError:鉴于当前 SASL 状态,请求无效
我有一个 AWS MKS 集群,我可以在其中创建主题、生成消息并使用我的 ec2 服务器中安装的 Kafka 消费消息。但是当我尝试在我的nodej中使用Kafka生产者时......
具有 1 个分区的 kafka 主题。 1 个生产者已在向该分区生产。如果我添加另一个制作人,我会遇到什么问题
我有一个有 1 个分区的主题。 其中一个系统充当生产者并将数据发送到该分区。 我想添加另一个不同的系统作为生产者并将数据发送到该分区...
Kafka Processor API 中 Header 有什么用?
我正在学习Kafka Processor API并在ProcessorContext中找到一个方法头。 标题() 返回当前输入记录的标题;可能 如果不可用则为 null 我什么...
Kafka - 日志结束偏移量(LEO)与高水位线(HW)之间的差异
Replica(Leader Replica)中LEO和HW有什么区别? 它们会包含相同的数字吗?我可以理解 HW 是最后提交的消息偏移量。 LEO什么时候更新,如何更新?
我注意到Kafka Producer使用tcp协议。 有没有办法提前固定多个Kafka生产者的源端口? 或者至少,有什么方法可以跟踪 Ka 的源端口...
我运行一个生产者并使用以下代码引发异常。 @GetMapping("/发送") 公共 ResultVo send(@RequestParam(value = "content") 字符串内容) {
所有生成的 kafka 消息始终发送到分区 0,即使密钥哈希值为 1
我有一个带有 2 个分区的 Kafka 主题。我创建了一个带有 2 个不同键的消息处理程序。 @豆 @ServiceActivator(inputChannel = "pushDataRequestChannel") 公共消息处理程序
我有一个卡夫卡生产者暂时失去了与经纪人的联系。每个主题定义有 10 个分区。当连接丢失时,我在运行该程序的进程中看到以下日志...
无法连接到种子代理,请尝试列表中的另一个代理:连接失败:端口应 >= 0 且 < 65536
我有一个 AWS MKS 集群,我可以在其中创建主题、生成消息并使用我的 ec2 服务器中安装的 Kafka 消费消息。但是当我尝试在我的nodej中使用Kafka生产者时......
我的应用程序应该能够接受任何对象。 接受对象后,我需要将消息发布到 kafka 主题。主题附加了架构。 我能够进行模式验证...
我一直在尝试使用 python confluent-kafka 库将消息发送到 AWS MSK。 我想确保每条消息的单一传递,这就是我使用基于事务的生产者的原因。我现在...
当 Kafka Broker 宕机并恢复时,kafka producer 中的数据丢失
每当 Kafka 代理关闭并重新加入时,我都会面临一些数据丢失。我想每当代理加入集群时都会触发重新平衡,此时我观察到一些错误...
Kafka 在高负载时抛出超时错误,错误:org.apache.kafka.common.errors.TimeoutException
我在应用中使用了Spring Kafka,当负载过高时,会抛出超时异常: 异常:topic_name 的 XX 记录过期:600904 ms 自批次创建以来已过去...
xxxxx 的 1 条记录即将过期:自批创建以来已过去 30030 毫秒加上逗留时间
我的用例: 使用 Postman,我调用了一个 Spring boot soap 端点。端点创建一个 KafkaProducer 并将消息发送到特定主题。我还有一个 TaskScheduler 来使用该主题。 该...
Kafka Consumer 没有收到来自主题的消息——如何调试?
Producer.py 从 binance.websocket.spot.websocket_client 导入 SpotWebsocketClient 从 binance.spot 导入 Spot 作为客户端 从 kafka.admin 导入 KafkaAdminClient,NewTopic 从卡夫卡导入
在 Kafka 中写入主题时出现 LocalBuffer 已满错误
所以,我们有一个生产者可以正常运行几个月没有错误,但突然开始出错 'BufferError:本地:队列已满' 我最初遇到过这个问题,然后
在 Kafka producer 中设置 `compression.type` 与在 broker 中定义它有什么区别?
我正在尝试设置 compression.type,目前在我的代理配置中进行设置。如果我不在我的生产者中定义属性,它会生效吗?我没有指定任何 compression.type ...
创建连接器时无法获取 producer.override.* 的值
为了获得更高的吞吐量,CSVSourceConnector 在创建连接器时使用 producer.override.* 覆盖生产者配置。但是看不出有什么区别...