spring-kafka 相关问题

Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。

Spring Boot3中无法使用EmbeddedKafka发送Kafka消息

我的 Spring Boot 应用程序(版本 3.2.1)和嵌入式 Kafka 面临问题。问题是KafkaTemplate没有成功发送消息 这是我的测试代码 导入组织....

回答 1 投票 0

使用 kafka 生产者进行 Spring 测试容器

我正在尝试将 Testcontainers 集成到我的 Spring Boot 应用程序中。 我正在使用 Kafka 容器来发送 Kafka 消息。 主要问题是,当我正确地向 kafka 发送消息时,我想

回答 1 投票 0

批量监听器中无法处理反序列化异常

我正在将Spring boot从版本2迁移到版本3。 在我以前的实现中,我基于 v2.8.4 文档,一切正常。 目前,ListenerUtils。

回答 1 投票 0

Spring 将 Java DSL 与 Kafka 集成

我正在用kafka spring集成Java DSL做一个poc 我正在从数据库(DB)读取一行并将该行作为消息发送到 Kafka 主题。请找到下面的代码。 代码正在编译,我可以

回答 1 投票 0

如何连接到云端的 Kafka 生产者

我对使用已经在云上创建的kafka生产者和使用新的Producer生产者=新的KafkaProducer以编程方式创建新的生产者感到困惑 我对使用已经在云上创建的 kafka 生产者和使用 new Producer Producer = new KafkaProducer (props)以编程方式创建新的 prodcuer 之间感到困惑。我如何调用已经存在的生产者,即在云上创建的生产者。假设它是一个简单的java类,没有使用任何像spring这样的框架 创建新生产者与调用现有生产者 连接到已在云上创建的 Kafka 生产者需要配置 Kafka 客户端以连接到远程 Kafka 集群。以下是在简单的 Java 类中使用 Kafka 生产者 API 连接到在云上创建的现有 Kafka 生产者的步骤,无需任何 Spring 等框架: 配置 Kafka Producer 属性: 从云提供商或管理员处获取现有 Kafka 生产者的配置详细信息(例如引导服务器、安全设置)。相应地更新您的属性。 Properties props = new Properties(); props.put("bootstrap.servers", "cloud-kafka-broker1:9092,cloud-kafka-broker2:9092"); // Add other required properties like security settings, serializers, etc. 创建Kafka生产者: 使用 KafkaProducer 类创建具有提供的属性的 Kafka 生产者实例。 Producer<String, String> producer = new KafkaProducer<>(props); 发送消息: 您可以使用send方法向云端Kafka集群中的主题发送消息。 ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", "key", "value"); producer.send(record); 关闭生产者: 使用完生产者释放资源后,请确保将其关闭。 producer.close(); 完整的 Java 类可能如下所示: import org.apache.kafka.clients.producer.*; import java.util.Properties; public class CloudKafkaProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "cloud-kafka-broker1:9092,cloud-kafka-broker2:9092"); // Add other required properties like security settings, serializers, etc. Producer<String, String> producer = new KafkaProducer<>(props); try { ProducerRecord<String, String> record = new ProducerRecord<>("your-topic", "key", "value"); producer.send(record); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } } 确保将 "cloud-kafka-broker1:9092,cloud-kafka-broker2:9092" 和 "your-topic" 分别替换为实际的云 Kafka 代理地址和主题名称。此外,根据云提供商的规范配置其他必要的属性。

回答 1 投票 0

EmbeddedKafka 身份验证失败,原因是:与客户端机制 PLAIN 发生意外的握手请求,启用的机制为 []

我使用EmbeddedKafkaBroker编写了一个简单的测试,我创建了一个测试生产者并发送了一条消息,但是我的KafkaListener没有被触发,所以测试每次都失败。有没有办法测试我的Ka...

回答 2 投票 0

Spring云流项目,Leyton版本忽略kafka DLQ配置

我用Spring Cloud Streaming做了一些实验,其中kafka是数据源。 需要配置 dlq 而我的配置不支持它。 所以我打开了开源示例,它...

回答 1 投票 0

Spring 流项目,Leyton 版本忽略 kafka DLQ

我用Spring Cloud Streaming做了一些实验,其中kafka是数据源。 需要配置 dlq 而我的配置不支持它。 所以我打开了开源示例,它...

回答 1 投票 0

我使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题

我在使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题 该方法是否应该将左流中的所有记录保留 24 小时以查找匹配

回答 1 投票 0

批量提交偏移量,而不是在消费时逐一提交

假设 1 个线程正在处理 msg0,第 2 个线程正在处理 msg1。现在,由于并行性性质,msg2 已得到处理并提交其从 0->1 的偏移量。但是由于任何原因而消耗 msg0 时...

回答 1 投票 0

Spring boot 2.3.10 Kafka 通过手动确认进行重试尝试

我有一个配置为手动确认的kafka侦听器。 @KafkaListener( 主题= [“我的主题”], groupId = "组 ID", 容器工厂=“

回答 1 投票 0

Kafka 反序列化错误并记录分区、主题和偏移量

我正在使用 DefaultKafkaConsumerFactory 上发送的 ErrorHandlingDeserializer 处理反序列化错误。 尝试(ErrorHandlingDeserializer errorHandlingDeserializer = new

回答 1 投票 0

从另一个配置类将主题注入到@KafkaListener注释

我有一个java配置类是这样说的 @配置 公共类 MyConfig { @Value("${kafka.topic:default_topic}") 字符串kafkaTopic; } 我当前的 KafkaListener 有一个

回答 1 投票 0

Kakfa createTopics 失败

我是 Kafka 新手,我正在尝试使用命令行创建一个主题,但它给了我以下错误: C:\kafka in\windows>kafka-topics.bat --create --topictutorialspedia --boot...

回答 2 投票 0

如何验证 JSON 以确保所有字段非空且年龄 >0?

通过 Rest API 端点,我接受 JSON 并使用 Kafka Producer 将 JSON 发布到主题。 JSON 将具有以下格式: { "id": "唯一标识符", “名字&...

回答 1 投票 0

简单的汇合kafka spring boot应用程序不起作用,似乎没有读取application.properties

我通过 Confluence 站点创建了一个 Confluence kafka 代理,并通过 CLI 创建了一个主题。 然后我通过initializr和kafka lib创建了一个简单的spring boot项目。 然后创建了简单的生产者并

回答 1 投票 0

Java 虚拟线程导致应用程序挂在 logback 中的 ReentrantLocks 上

使用 Spring Boot 3.2.1 和 Tomcat 10 从 java 17 迁移到 java 21 后遇到问题。启用并使用虚拟线程(例如,我可以看到 tomcat 生成新的虚拟线程...

回答 1 投票 0

合流云启动器 spring-boot 代码因不兼容类型而损坏:找到:CompletableFuture,必需:ListenableFuture

创建kafka的人confluence cloud提供的源代码似乎不起作用,我不知道如何修复。错误是: 不兼容的类型。发现:'java.util.concurrent.

回答 1 投票 0

在 Spring Kafka 3 中启用跟踪的配置

我正在尝试在 Kafka 模板中启用可观察性来为事件创建跨度。我正在使用 Jaeger 的本地实例。在 Jaeger 实例中,我可以看到与 S3 bu 交互相关的跨度...

回答 1 投票 0

从 Spring Kafka 2.8.x 升级到 3.0.9 后自定义 recordInterceptor 不工作的问题

我最近从 Spring Kafka 2.8.x 升级到 3.0.9,运行测试后,我注意到我的自定义 recordInterceptor 不再按预期运行。我们的 recordInterceptor 旨在...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.