kafka-producer-api 相关问题

用于与Apache Kafka生产者API相关的问题。有关制作Kafka主题的任何问题。生产者失败和恢复,幂等性和事务性API。

在使用 MockProducer 调用 send 之前给出完成指令

我有以下代码(显然是作为示例): 类随机类( val制作人:制作人 ){ 有趣的 randomFunction(): 布尔值 { // 使用 .get() 因为我想

回答 1 投票 0

我可以忽略org.apache.kafka.common.errors.NotLeaderForPartitionExceptions吗?

我的 Apache Kafka 生产者 (0.9.0.1) 间歇性地抛出 org.apache.kafka.common.errors.NotLeaderForPartitionException 我执行 Kafka 发送的代码类似于此 最终的未来<

回答 2 投票 0

通过 kafka CLI 生成带有标头的 json 消息

尝试通过 kafka kafka-console- Producer.sh 生成简单的消息 信息: {“状态”:“成功”,“属性”:[]} 标题: 标头键 1:标头值 1 标头密钥2:标头V...

回答 1 投票 0

如何连接到云端的 Kafka 生产者

我对使用已经在云上创建的kafka生产者和使用新的Producer生产者=新的KafkaProducer以编程方式创建新的生产者感到困惑 我对使用已经在云上创建的 kafka 生产者和使用 new Producer Producer = new KafkaProducer (props)以编程方式创建新的 prodcuer 之间感到困惑。我如何调用已经存在的生产者,即在云上创建的生产者。假设它是一个简单的java类,没有使用任何像spring这样的框架 创建新生产者与调用现有生产者 连接到已在云上创建的 Kafka 生产者需要配置 Kafka 客户端以连接到远程 Kafka 集群。以下是在简单的 Java 类中使用 Kafka 生产者 API 连接到在云上创建的现有 Kafka 生产者的步骤,无需任何 Spring 等框架: 配置 Kafka Producer 属性: 从云提供商或管理员处获取现有 Kafka 生产者的配置详细信息(例如引导服务器、安全设置)。相应地更新您的属性。 Properties props = new Properties(); props.put("bootstrap.servers", "cloud-kafka-broker1:9092,cloud-kafka-broker2:9092"); // Add other required properties like security settings, serializers, etc. 创建Kafka生产者: 使用 KafkaProducer 类创建具有提供的属性的 Kafka 生产者实例。 Producer<String, String> producer = new KafkaProducer<>(props); 发送消息: 您可以使用send方法向云端Kafka集群中的主题发送消息。 ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", "key", "value"); producer.send(record); 关闭生产者: 使用完生产者释放资源后,请确保将其关闭。 producer.close(); 完整的 Java 类可能如下所示: import org.apache.kafka.clients.producer.*; import java.util.Properties; public class CloudKafkaProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "cloud-kafka-broker1:9092,cloud-kafka-broker2:9092"); // Add other required properties like security settings, serializers, etc. Producer<String, String> producer = new KafkaProducer<>(props); try { ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", "key", "value"); producer.send(record); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } } 确保将 "cloud-kafka-broker1:9092,cloud-kafka-broker2:9092" 和 "your-topic" 分别替换为实际的云 Kafka 代理地址和主题名称。此外,根据云提供商的规范配置其他必要的属性。

回答 1 投票 0

AWS MSK 事务支持

我正在尝试在 Lambda 函数内创建一个 Kafka Producer,并启用 Exactly-Once Delivery 支持以将消息推送到 MSK。 编辑:MSK IAM Auth 用于 Kafka 和

回答 1 投票 0

主题在 60000 毫秒后不存在于元数据中

用例场景: 我试图确保万一生产者为了获取元数据而连接的代理(引导程序之一)不可用,然后在阻止它之后...

回答 1 投票 0

kafka-console- Producer.sh错误:无法找到或加载主类kafka.tools.ConsoleProducer

刚刚安装了spotify/kafka的Docker镜像。须藤码头工人 PS: 82c411a52a38 spotify/kafka "supervisord -n" 26 分钟前 更新 26 分钟 0.0.0.0:2181->

回答 3 投票 0

如何正确处理Spring Kafka Producer发送消息失败?

我有一个 Spring Kafka 应用程序,它接收 HTTP 请求并将其有效负载发送到 Kafka 主题。我想预见以下不成功的情况: 最初,应用程序可以运行

回答 1 投票 0

Kafka 总是有一个消费者消费一组中的主题消息

我有两个具有相同组ID的消费者服务器订阅了相同的主题。 一台 kafka 服务器仅运行一个分区。 据我所知,消息应该在这两个中随机消耗

回答 2 投票 0

Apache Kafka - 重置上次看到的分区纪元。为什么?

我们使用 Kafka Mirror Maker Version 1 在 Kafka 集群之间镜像数据。我知道 MM1 已被弃用,但它是一款可靠的软件,完全可以满足我们的需要。我们将它用于专用...

回答 3 投票 0

连接到在 WSL2 Ubuntu 中运行的 Kafka

以下是我的Kafka代理配置 经纪人.id=1 端口=9092 主机名=127.0.0.1 广告.listeners=PLAINTEXT://127.0.0.1:9092 听众=PLAINTEXT://127.0.0.1:9092 控制台生产商和缺点...

回答 3 投票 0

Kafka 生产者将消息发布到辅助集群

建议的集群设置的描述 2个数据中心,每个数据中心有5个节点的Kafka集群 集群具有相同的主题和相同的生产者/消费者实例 没有数据

回答 2 投票 0

Kafka深度健康检查

我们最近遇到了一个问题,Kafka 代理遇到了阻止 IO 的内核问题(但我猜能够向 Zookeeper 发送心跳)。这样做的结果是 Kafka Broker ...

回答 2 投票 0

Apache Kafka 生产者配置:'request.timeout.ms' VS。 “max.block.ms”属性

鉴于以下同步kafka生产者 属性 props = new Properties(); props.put("max.block.ms", 30000); props.put("request.timeout.ms", 30000); props.put("重试", 5); 卡夫卡生产者<

回答 3 投票 0

如何访问kafka生产者使用的confluence注册表模式id?

我有一个使用汇合模式注册表的kafka生产者。我知道有一种算法,KafkaAvroSerializer 基于该算法从汇合模式寄存器中找到匹配的 schemaId...

回答 1 投票 0

我应该在哪里获取创建 Kafka 生产者时缺少的属性?

你能解释一下我应该在下面的代码中哪里得到丢失的属性吗? (smth 应该在 lsat 行中) KafkaProperties prop = new KafkaProperties(); KafkaProperties.Producer 生产者 = prop.

回答 1 投票 0

再现UnknownTopicOrPartitionException:此服务器不托管此主题分区

我们在生产环境中遇到了一些异常: UnknownTopicOrPartitionException:此服务器不托管此主题分区 根据我的分析,一种可能的解决方法......

回答 2 投票 0

将Map从KafkaProducer发送到KafkaConsumer

我正在使用java 21和Spring Boot 3.2.0(快照)。我想使用从 KafkaProducer 到 KafkaConsumer 发送 Map>。 KafkaConsumerConfig 类: @配置 噗...

回答 1 投票 0

从控制台生成具有空值(墓碑)的 Kafka 消息

有没有办法在kafka-console-生产者中生成一条具有空值的消息(即,将其标记为压缩器以使用逻辑删除来删除它)? 我尝试过生成“mykey”和“mykey|”。

回答 3 投票 0

如何在Python中处理特定数量的消息后优雅地停止Kafka消费者?

我有一个带有 BashOperator 的 Airflow DAG,它运行 Kafka 生产者,生成随机数量的消息。这些消息由 Kafka 消费者消费,并将它们写入 JSON 文件。然而...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.