kafka-consumer-api 相关问题

用于与Apache Kafka使用者API相关的问题

负载测试kafka消费者

(我正在编辑问题,因为我认为它不够清楚) 如何对我的 kafka 消费者进行负载测试? 我看过很多关于负载测试 apache kafka 的文章,但没有一篇关于负载测试 con...

回答 2 投票 0

Spring-kafka:打印RecordHeaders的正确方法是什么?

刚刚检查了日志,发现 ConsumerRecord 标头以绝对不可读的方式打印,原因是 RecordHeader 将值保存为 byte[],结果在日志中我看到了

回答 1 投票 0

在 Spring Boot 应用程序中实现反应式 Kafka 监听器

我正在尝试在 Spring boot 应用程序中实现反应式 kafka 消费者,我正在查看这些示例: https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/...

回答 1 投票 0

使用offsets_for_times从时间戳消费

尝试使用 confluence_kafka.AvroConsumer 来消费给定时间戳的消息。 如果标志: # 创建一个列表 topic_partitions_to_search = 列表( 映射(lambda p:TopicPartition('

回答 2 投票 0

如何消费kafka消息上的最后一条消息或者根据时间戳消费消息?

def kafkaa(自我,auto_offset_reset,超时= 500): group_name = "群组名称" config = {“bootstrap.servers”:“服务器”, “是...

回答 1 投票 0

Spring Kafka 不可重试异常过滤器

我正在尝试实现动态 Kafka 异常处理,特别是在某些条件下重试,但找不到任何方法来过滤不应该仅由其类重试的异常。我的...

回答 1 投票 0

将整批发送到 dlt,无需重试

我正在使用spring kafka,并且我有一个用java spring boot编写的kafka消费者。我的消费者按批次消费,相关配置 bean 如下所示。 @豆 公共消费者工厂 我正在使用 spring kafka,并且我有一个用 java spring boot 编写的 kafka 消费者。我的消费者批量消费和相关配置bean如下。 @Bean public ConsumerFactory<String, Object> consumerFactory() { Map<String, Object> config = new HashMap<>(); // default configs like bootstrap servers, key and value deserializers are here config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); return new DefaultKafkaConsumerFactory<>(config); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.DEBUG); factory.setBatchListener(true); return factory; } 我使用消息并将这些消息发送到 API 端点。如果 api 不可用或者其余模板抛出错误,我想将整个批次发送到 DLT 而无需重试。 我想要做的是将整个批次发送到 DLT,而不重试。如果我们抛出 BatchListenerFailedException,则批次中拥有特定索引号的消息将发送到 DLT。在 BatchListenerFailedException 中,我们只能传递一个整数值作为索引值,而不是一个列表。但我想要的是将整个批次按原样发送到 DLT 主题,而无需重试。有办法实现吗? 我的Spring Kafka版本是2.8.6 编辑 我的默认错误处理程序如下 @Bean public CommonErrorHandler commonErrorHandler() { ExponentialBackOffWithMaxRetries exponentialBackOffWithMaxRetries = new ExponentialBackOffWithMaxRetries(5); exponentialBackOffWithMaxRetries.setInitialInterval(my val); exponentialBackOffWithMaxRetries.setMultiplier(my val); exponentialBackOffWithMaxRetries.setMaxInterval(my val); DefaultErrorHandler errorHandler = new DefaultErrorHandler( new DeadLetterPublishingRecoverer(kafkaTemplate(), (record, exception) -> new TopicPartition(record.topic() + "-dlt", record.partition())), exponentialBackOffWithMaxRetries); errorHandler.addNotRetryableExceptions(ParseException.class); errorHandler.addNotRetryableExceptions(EventHubNonRetryableException.class); return errorHandler; } 在我的例子中使用ExponentialBackOffWithMaxRetries而不是FixedBackOff。就我而言,我有 3 个场景。 1 - 重试消息并将其发送到 DLT(抛出除 BatchListenerFailedException 之外的任何其他异常) 2 - 将批次中的几条消息发送到 DLT,无需重试(为此使用 BatchListenerFailedException) 3 - 将整个批次发送到 DLT,无需重试。 第三个是我苦苦挣扎的地方。如果我发送一些其他异常,那么它会重试几次。 (即使我用FixedBackOff代替ExponentialBackOffWithMaxRetries) 扔BatchListenerFailedException以外的其他东西;将 DefaultErrorHandler 与 DeadLetterPublishingRecoverer 一起使用,无需重试 (new FixedBackOff(0L, 0L))。 编辑 从版本3.0.0、2.9.3、2.8.11开始,您可以为批量错误配置不可重试的异常。 https://github.com/spring-projects/spring-kafka/issues/2459 看 /** * Add exception types to the default list. By default, the following exceptions will * not be retried: * <ul> * <li>{@link DeserializationException}</li> * <li>{@link MessageConversionException}</li> * <li>{@link ConversionException}</li> * <li>{@link MethodArgumentResolutionException}</li> * <li>{@link NoSuchMethodException}</li> * <li>{@link ClassCastException}</li> * </ul> * All others will be retried, unless {@link #defaultFalse()} has been called. * @param exceptionTypes the exception types. * @see #removeClassification(Class) * @see #setClassifications(Map, boolean) */ @SafeVarargs @SuppressWarnings("varargs") public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) { add(false, exceptionTypes); notRetryable(Arrays.stream(exceptionTypes)); } 请注意,2.8.x 现已不再支持 OSS。 https://spring.io/projects/spring-kafka#support 您能就第二个场景的实施与我联系吗?这将是一个很大的帮助。 谢谢

回答 2 投票 0

如何将LinkedHashMap转换为自定义对象?

我的 Kafka Consumer 端有 2 个主题:主题 A 和主题 B。两者都采用相同的方法。详细方法如下; @Slf4j @服务 公共类 KafkaConsumerService { @KafkaListe...

回答 1 投票 0

如何将LinkedHasMap转换为自定义对象?

我的 Kafka Consumer 端有 2 个主题。主题 A 和主题 B。两者都采用相同的方法。详细方法如下; @Slf4j @服务 公共类 KafkaConsumerService { @KafkaListe...

回答 1 投票 0

Kafka Consumer 无法使用 KafkaJS 在本地工作

我正在尝试使用 KafkaJS 在本地运行 Kafka 消费者。它表明消费者正在运行,但它没有从生产者主动向其中推送事件的主题进行消费。消费者

回答 1 投票 0

Kafka 状态存储作为本地持久化选项?

我有一个简单的分布式系统架构,其中一个生产者系统将事件写入一个 kafka 主题。这些事件基本上只被一个系统消耗。这位消费者整晚都在加载...

回答 1 投票 0

Python kafka消费者组id问题

据我所知, kafka中引入分区和(消费者)组的概念来实现并行性。我正在通过 python 与 kafka 合作。我有一个特定的主题,它有(比如说)2 个分区。

回答 4 投票 0

写入死信主题以防反序列化异常

我有一个 Spring Boot 应用程序,它有一个带有 @KafkaListener 的简单 Consumer。我有阻止重试逻辑,可以按预期工作,但如果出现反序列化异常,我想存储...

回答 2 投票 0

Kafka 工作效率低下(Consumer Python)

我的 apache kafka 有问题,问题是; 当我从后端(spring-boot)向我的主题发送消息时,python 客户端不会立即收到它,而且根本没有收到......

回答 1 投票 0

使用 subscribe() 和 allocate() 读取 kafka 主题的奇怪区别

我的任务是统计Kafka主题中的消息(有些有一个分区,有些有多个分区)。我尝试了两种技术:一种使用 subscribe(),另一种使用 allocate()。 完整代码: #!/usr/bin/env py...

回答 1 投票 0

使用kafkaListener和syncCommit,但它们不起作用

我有一个应用程序,它侦听 Kafka 主题并在 1 个线程中从中读取 1 条消息。读完一条消息后,有一定的逻辑,期间有两个选择——一切都是

回答 1 投票 0

所有消息消费完后如何关闭kafka消费者?

我有以下程序来消费所有传入 Kafka 的消息。 从 kafka 导入 KafkaConsumer 消费者 = KafkaConsumer('my_test_topic', group_id='我的组', ...

回答 4 投票 0

我的Kafka Consumer重试10次异常,通过简单添加MDC日志来停止重试?

我有一个 Kafka 监听器,我向主题发送一条 Kafka 消息,我的监听器接收了 10 次(第二次是在第一个完成其处理功能之后,第三次是在

回答 1 投票 0

即使组 id 和客户端 id 设置正确,Kafka Consumer 也没有收到来自 Topic 的任何消息

使用 @InputChannelAdaptor ,我正在从主题轮询消息,但如果我从命令行发布 json,则不会收到任何消息。但如果我传递任何文本,它会引发异常。我正在尝试使用 json

回答 1 投票 0

Spring Kafka 使用 AckMode.RECORD 手动确认

我正在尝试将 Kafka 用于我拥有的以下用例。 我们的生产者将向 kafka 主题生成消息。我们的消费者本质上很慢,因为每条消息处理都会有所不同......

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.