kafka-producer-api 相关问题

用于与Apache Kafka生产者API相关的问题。有关制作Kafka主题的任何问题。生产者失败和恢复,幂等性和事务性API。

验证 Kafka 主题中是否存在消息

我希望避免向 Kafka 主题发送重复的消息。 实现它的理想方式是什么? 使用 Apache Kafka 的 Java 客户端,是否可以在调用之前验证消息是否存在

回答 3 投票 0

我们可以在Spring Boot中使用多个kafka模板吗?

在我的 spring boot kafka 发布者应用程序中,我想提供对以 String(json) 或字节格式发布消息的支持,因为我想同时提供对 json 和 avro 的支持。但是

回答 3 投票 0

KafkaJSProtocolError:鉴于当前 SASL 状态,请求无效

我有一个 AWS MKS 集群,我可以在其中创建主题、生成消息并使用我的 ec2 服务器中安装的 Kafka 消费消息。但是当我尝试在我的nodej中使用Kafka生产者时......

回答 1 投票 0

具有 1 个分区的 kafka 主题。 1 个生产者已在向该分区生产。如果我添加另一个制作人,我会遇到什么问题

我有一个有 1 个分区的主题。 其中一个系统充当生产者并将数据发送到该分区。 我想添加另一个不同的系统作为生产者并将数据发送到该分区...

回答 1 投票 0

Kafka Processor API 中 Header 有什么用?

我正在学习Kafka Processor API并在ProcessorContext中找到一个方法头。 标题() 返回当前输入记录的标题;可能 如果不可用则为 null 我什么...

回答 2 投票 0

Kafka - 日志结束偏移量(LEO)与高水位线(HW)之间的差异

Replica(Leader Replica)中LEO和HW有什么区别? 它们会包含相同的数字吗?我可以理解 HW 是最后提交的消息偏移量。 LEO什么时候更新,如何更新?

回答 3 投票 0

我们可以为Kafka Producer分配源端口吗?

我注意到Kafka Producer使用tcp协议。 有没有办法提前固定多个Kafka生产者的源端口? 或者至少,有什么方法可以跟踪 Ka 的源端口...

回答 2 投票 0

Kafka 生产者 - 无法将记录追加到只读镜像主题

我运行一个生产者并使用以下代码引发异常。 @GetMapping("/发送") 公共 ResultVo send(@RequestParam(value = "content") 字符串内容) {

回答 1 投票 0

所有生成的 kafka 消息始终发送到分区 0,即使密钥哈希值为 1

我有一个带有 2 个分区的 Kafka 主题。我创建了一个带有 2 个不同键的消息处理程序。 @豆 @ServiceActivator(inputChannel = "pushDataRequestChannel") 公共消息处理程序

回答 1 投票 0

Kafka golang 生产者在错误后更改分区计数

我有一个卡夫卡生产者暂时失去了与经纪人的联系。每个主题定义有 10 个分区。当连接丢失时,我在运行该程序的进程中看到以下日志...

回答 0 投票 0

无法连接到种子代理,请尝试列表中的另一个代理:连接失败:端口应 >= 0 且 < 65536

我有一个 AWS MKS 集群,我可以在其中创建主题、生成消息并使用我的 ec2 服务器中安装的 Kafka 消费消息。但是当我尝试在我的nodej中使用Kafka生产者时......

回答 0 投票 0

如何在不生成 POJO 类的情况下发送 avro 消息

我的应用程序应该能够接受任何对象。 接受对象后,我需要将消息发布到 kafka 主题。主题附加了架构。 我能够进行模式验证...

回答 1 投票 0

AWS MSK(kafka)生产者事务提交超时

我一直在尝试使用 python confluent-kafka 库将消息发送到 AWS MSK。 我想确保每条消息的单一传递,这就是我使用基于事务的生产者的原因。我现在...

回答 0 投票 0

当 Kafka Broker 宕机并恢复时,kafka producer 中的数据丢失

每当 Kafka 代理关闭并重新加入时,我都会面临一些数据丢失。我想每当代理加入集群时都会触发重新平衡,此时我观察到一些错误...

回答 1 投票 0

Kafka 在高负载时抛出超时错误,错误:org.apache.kafka.common.errors.TimeoutException

我在应用中使用了Spring Kafka,当负载过高时,会抛出超时异常: 异常:topic_name 的 XX 记录过期:600904 ms 自批次创建以来已过去...

回答 0 投票 0

xxxxx 的 1 条记录即将过期:自批创建以来已过去 30030 毫秒加上逗留时间

我的用例: 使用 Postman,我调用了一个 Spring boot soap 端点。端点创建一个 KafkaProducer 并将消息发送到特定主题。我还有一个 TaskScheduler 来使用该主题。 该...

回答 1 投票 0

Kafka Consumer 没有收到来自主题的消息——如何调试?

Producer.py 从 binance.websocket.spot.websocket_client 导入 SpotWebsocketClient 从 binance.spot 导入 Spot 作为客户端 从 kafka.admin 导入 KafkaAdminClient,NewTopic 从卡夫卡导入

回答 0 投票 0

在 Kafka 中写入主题时出现 LocalBuffer 已满错误

所以,我们有一个生产者可以正常运行几个月没有错误,但突然开始出错 'BufferError:本地:队列已满' 我最初遇到过这个问题,然后

回答 1 投票 0

在 Kafka producer 中设置 `compression.type` 与在 broker 中定义它有什么区别?

我正在尝试设置 compression.type,目前在我的代理配置中进行设置。如果我不在我的生产者中定义属性,它会生效吗?我没有指定任何 compression.type ...

回答 2 投票 0

创建连接器时无法获取 producer.override.* 的值

为了获得更高的吞吐量,CSVSourceConnector 在创建连接器时使用 producer.override.* 覆盖生产者配置。但是看不出有什么区别...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.