spring-kafka 相关问题

Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。

在java项目中使用kafka进行更新和插入操作是不是一个好主意?

在java项目中使用kafka进行更新和插入操作是一个好主意吗?我们正在将遗留批处理项目从 Perl 迁移到 Spring Boot 微服务,并且我们计划使用 kafka 作为 DB

回答 1 投票 0

每个任务的 KafkaProducer 或单例 KafkaProducer

我有几个并行运行的任务(ExecutorService 和 Runnable 任务)。对所有这些都使用一个 KafkaProducer 实例(Spring singleton bean)是一个好的选择吗?在这种情况下,如何

回答 1 投票 0

org.springframework.kafka.support.serializer.ErrorHandlingDeserializ'类型为serializer.ErrorHandlingDeserializer],同时设置构造函数参数

通过参考:How to handle the exception at Consumer in Spring XMl App?,我试图在 xml bean 中转换 Java bean 依赖关系并面临以下错误。 有人可以解释一下...

回答 1 投票 0

Kafka Consumer 无法间歇性解析监听器方法

我在卡夫卡消费者端遇到了下面的异常。令人惊讶的是,这个问题与旧版本的代码不一致(具有完全相同的配置,但有一些新的不相关...

回答 1 投票 0

Spring XMl App中consumer端如何处理异常?

如何在Spring XML + Kafka App中进行错误处理?我正在使用 JSON 来生成和消费消息,但是当消费者获取垃圾数据时,它会运行无限循环。 已在此处上传代码:https://

回答 1 投票 0

Spring boot kafka - 如何告诉 JsonDeserializer 忽略类型标头?

Spring 的 Kafka 生产者将类型标头嵌入到消息中,该消息指定消费者应将消息反序列化到哪个类。当生产者不使用 Spring Kafka 时,这是一个问题...

回答 2 投票 0

找不到所需类型:BiConsumer

我想将这段Java代码迁移到Spring Boot 3: 公共无效发送(字符串主题,K键,V消息,BiConsumer,Throwable>回调){ 完整的未来 我想将此 Java 代码迁移到 Spring Boot 3: public void send(String topic, K key, V message, BiConsumer<SendResult<K, V>, Throwable> callback) { CompletableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message); if (Objects.nonNull(callback)) { future.whenComplete(callback); } } ...... // call kafka method kafkaProducer().send("topic", "key", "messge", listenableFutureCallback("12345")); ...... private ListenableFutureCallback listenableFutureCallback(String userId) { return new ListenableFutureCallback() { @Override public void onFailure(Throwable ex) { ....... } @Override public void onSuccess(Object result) { ........ } }; } 我在这一行遇到错误:listenableFutureCallback("12345") Required type: BiConsumer <org.springframework.kafka.support.SendResult<java.lang.String,java.lang.String>, java.lang.Throwable> Provided: ListenableFutureCallback 你知道我应该如何迁移/替换 ListenableFutureCallback 以便从 BiConsumer Java 方法返回 listenableFutureCallback 吗? 这似乎是 BiConsumer 导入的问题。您应该使用“java.util.function.BiConsumer”中的 BiConsumer,而不是“org.apache.kafka.commonKafkaFuture.BiConsumer”。 PS:据我了解,ListenableFutureCallback 在 Spring 6.0 中已被弃用。请尝试从您的代码中删除它并使用 CompletableFuture 代替。 https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/util/concurrent/ListenableFutureCallback.html

回答 1 投票 0

Spring boot @KafkaListener 不使用框架中提到的 SPEL 监听主题名称

我开发了一个kafka监听器。以下是我设计的不同课程: 类 KafkaProperties { 公共字符串主题ZZZ; 公共字符串组ID; ... } 类 KafkaConfig { ...

回答 1 投票 0

java.lang.RuntimeException:不可为 null 的字段 authBytes 从 Spring Boot 序列化为 null 到 Azure 事件中心

我正在使用 v2.7.5 开发一个简单的 Spring Boot 应用程序,并尝试将消息发送到事件中心。有人可以指导我吗? 错误: java.lang.RuntimeException:不可为 null 的字段 authBytes 为

回答 2 投票 0

生产者无法向 Kafka Broker 发送消息

@SpringBootApplication 公共类 SpringKafkaApplication { 公共静态无效主(字符串[] args){ SpringApplication.run(SpringKafkaApplication.class, args); } @Autowired

回答 1 投票 0

当密码包含“:”字符时,JaasConfig 会错误地解析应用配置

我有 SASL 身份验证问题,该身份验证使用 JaasConfig 配置来构建登录连接字符串 我的连接字符串看起来像 required username= pass...

回答 1 投票 0

在 Docker-compose 中使用 Kafka 运行 Spring Boot 应用程序

我有应用程序(kafka客户端)。我有下一个属性: spring.kafka.bootstrap-servers=127.0.0.1:29092 spring.kafka.consumer.group-id=mc3 我在 Docker-compose 中有 kafka: 卡夫卡: 图片:

回答 1 投票 0

Kafka - 反序列化 Consumer 中的对象

我们正在考虑在我们的消息传递中使用 Kafka,我们的应用程序是使用 Spring 开发的。所以,我们打算使用spring-kafka。 生产者将消息作为 HashMap 对象放入...

回答 2 投票 0

KafkaTemplate 和 KafkaProducer 发送方法的区别?

我的问题是在使用kafka的Spring Boot微服务中什么适合使用KafkaTemplate.send()或KafkaProducer.send() 我使用 KafkaConsumer 而不是 KafkaListener 来轮询记录

回答 1 投票 0

如何在 Kubernetes multipod 部署中使用 spring kafka 处理 Kafka 容器生命周期

我正在使用 Spring kafka 实现,我需要通过 REST API 启动和停止我的 kafka 消费者。为此,我正在使用 KafkaListenerEndpointRegistry endpointRegistry 端点注册表。

回答 2 投票 0

某些消息随机获取 IAMClientCallbackHandler 类未找到异常

在我们目前的用例中,我们一条一条地发送消息[不是批量]以维持传入的顺序顺序,同时一条一条地发送大约 5000 多条消息或使用

回答 1 投票 0

Spring Kafka - offsetsForTimes 方法对某些分区返回 null

我使用 spring-boot 2.2.11、spring-kafka 2.4.11 和 apache kafka-clients 2.4.1 我让我的消费者实现了 ConsumerAwareRebalanceListener,并且我试图在特定时间戳之后寻求偏移

回答 2 投票 0

使用 Spring Kafka 的 Kafka Producer 在尝试生产时需要很长时间才能刷新元数据

我们对 Kafka 有一个标准设置,我们使用 @KafkaLisetener 来侦听某个主题上的事件,并使用 KafkaTemplate 将响应发送到另一个主题。 为了提高吞吐量,我们增加...

回答 1 投票 0

如何测试@onFailure方法?

我怎样才能转到“@Override public void onFailure(Throwable ex) { ... }”? 它总是“@Override public void onSuccess(SendResult result) {...}。 我...

回答 2 投票 0

即使在确认消息后,重复消息也会多次发送到@Dlthandler

我正在尝试测试 Kafka 的 @RetryableTopic 功能,在重试消息 3 次后,我想将其推送到 Kafka SQS。我收到来自重试线程的呼叫 4 次并呼叫 DltHand...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.