Spring for Apache Kafka(spring-kafka)项目将核心Spring概念应用于基于Kafka的消息传递解决方案的开发。
我已经使用Kafka Streams设置了一个简单的Spring Boot应用程序。 Spring Boot使用AutoConfiguration for Kafka创建工厂。消息是没有消息密钥的Json消息。使用以下...
Spring Kafka - 如何在生成消息时获取时间戳(事件时间)
我需要在kafka使用者应用程序中获取生成消息时的时间戳(事件时间)。我知道timestampExtractor,可以与kafka流一起使用,但是......
在spring boot中创建KafkaTemplate的正确方法
我尝试在spring启动应用程序中配置apache kafka。我阅读本文档并按照以下步骤操作:1)我将此行添加到aplication.yaml:spring:kafka:bootstrap-servers:kafka_host:...
春季启动后,Kafka主题不会在远程kafka上自动创建(并在本地kafka服务器上创建)
1)我在我的机器上启动kafka 2)我使用config启动我的spring启动服务器:@Bean public NewTopic MyTopic(){return new NewTopic(“my-topic”,5,(short)1); } @Bean public ProducerFactory
如何确保Spring Cloud Stream Listener等待处理消息,直到Application在Start上完全初始化?
使用Spring Cloud Stream Kafka应用程序,我们如何确保流监听器等待处理消息,直到完成某些依赖性任务(例如参考数据填充)?以下app无法...
spring kafka流支持的文档显示如下内容:@Bean public KStream kStream(StreamsBuilder kStreamBuilder){KStream stream = ...
Spring kafka Batch Listener-在Batch中手动提交偏移
我正在实现spring kafka批量监听器,它从Kafka主题读取消息列表并将数据发布到REST服务。我想了解REST的偏移管理......
如何在spring配置中创建NewTopic bean列表?
我在yaml文件中配置了10个kafka主题,我需要在aplication start上创建所有主题。但我不明白我怎么能用List做。我可以创建一个bean:@Bean public NewTopic newTopic()...
我有两个Kafka集群,我从数据库中动态获取的IP。我正在使用@KafkaListener来创建监听器。现在我想在运行时创建多个Kafka监听器...
Kafka:序列化时的消息大于您使用max.request.size配置配置的最大请求大小
得到以下错误(Kafka 2.1.0):2018-12-03 21:22:37.873错误37645 --- [nio-8080-exec-1] osksupport.LoggingProducerListener:发送带key ='的消息时抛出异常...
我们正在研究优化磁盘空间使用的选项。在这样做时,我们通过kafka分区查看保留的磁盘空间。现在错误主题使用相同数量的分区......
如何将spring-boot属性设置为:spring.kafka.producer.retries为Integer.MAX_VALUE?是否正在设置此属性,否则默认为0? @参见KIP中的默认kafka https:// cwiki ....
我在我的项目中使用spring Kafka,我希望在消费者根据键和值消耗之前过滤消息。可能吗?
通过concurrentMessageListenerContainer分配主题分区
我正在设置一个ConcurrentMessageListenerContainer
fetch-min-size和max-poll-records sping kafka配置无法按预期工作
我正在使用spring kafka的Spring启动应用程序,该应用程序监听kafka的单个主题,然后隔离各个类别的记录,从中创建一个json文件并...
获取java.lang.OutOfMemoryError:使用Spring kafka生成消息时的Java堆空间
我正在尝试在Pivotal Cloud Foundry上部署一个spring boot应用程序,使用spring kafka向kafka发送消息,并在manifest.yml中获得1GB内存的错误。我的sprint启动应用只有一个......
在库更新之前的Spring Boot / Kafka应用程序中,我使用了以下类org.telegram.telegrambots.api.objects.Update,以便将消息发布到Kafka主题。现在我用...
在我的应用程序中,我使用spring-kafka来消费来自kafka服务器的消息,但是从控制台消费者我得到所有消费者线程的消费者ID,这些消费者线程是活动的TOPIC PARTITION CURRENT -...
如何在spring-boot应用程序启动期间创建许多kafka主题?
我有这样的配置:@Configuration public class KafkaTopicConfig {private final TopicProperties topic; public KafkaTopicConfig(TopicProperties topics){this.topics = ...
我必须记录消费者在春天卡夫卡所花的时间。当kafkaListener方法为每条消息执行时,将记录器放在那里是行不通的。有时也会有一些消息......