apache-kafka 相关问题

Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。

Python kafka 以异步方式获取消费者消息

我正在使用kafka-python。我试图在 kafka_consumer 中定义消息以在后台运行异步。目前,在我看来,它是同步运行的,从而阻止了其他方法。 kafka-pyt 吗...

回答 1 投票 0

带有 Kafka 的 Filebeat 不会将缺少冒号的消息插入到中央日志记录中

我正在使用一个写入日志文件的应用程序。 Filebeat 使用 Kafka 模块将日志数据发送到中央日志服务器。我正在通过将

回答 1 投票 0

Kafka消费者在docker中收不到消息

我正在为 Kafka 创建一个非常基本的消费者/生产者设置。我正在使用 Zookeper 和 Kafka 以及 Docker compose。两者似乎都有联系,但当涉及到生产或消费时,没有什么……

回答 1 投票 0

将 strimzi kafka 集群从 0.36.1 升级到 0.40.0 后,普通监听器的主题授权失败

将strimzi kafka集群从0.36.1升级到0.40.0后。普通侦听器无法从端口 9092 连接到 kafka。它会抛出异常 在消费者、生产者中: “无法连接未授权访问...

回答 1 投票 0

当您没有 Java POJO 时,反序列化 avro 消息的预期结果是什么?

假设我有许多 avro 架构定义:(i) Event1、(ii) Event2、(iii) EventWrapper。 EventWrapper 是一个具有一个字段(称为有效负载)的记录,该字段是 Event1、Event2 的并集。 我也...

回答 1 投票 0

kafka如何使用write-behind?

据我了解,kafka 在写入数据时使用“write-behind”技术。这意味着它将首先将数据写入页缓存(作为脏页),并最终刷新这些脏页

回答 1 投票 0

无法本地连接到容器化 Kafka,错误:缺少 close-

我相信我遇到了一个似乎无法解决的配置问题。目前我有一个容器化的 Kafka 代理,我想在本地连接到它。然而,当我尝试连接时

回答 1 投票 0

连接kafka和cassandra时出现NoHostAvailableException

org.apache.kafka.connect.errors.ConnectException:无法连接到 Cassandra。 在io.lenses.streamreactor.connect.cassandra.source.CassandraSourceTask.start(CassandraSourceTask.scala:86) ...

回答 1 投票 0

Avro Schema,引用 json 文件中的枚举值

我正在为 kafka 主题定义一个带有 Enum 字段的 avro 模式。 avro 模式将上传到 kafka 模式注册表。 我在 github 存储库中有一个 json 文件,定义如下:

回答 1 投票 0

Avro Schema,参考 yaml 文件中的枚举值

我正在为 kafka 主题定义一个带有 Enum 字段的 avro 模式。 avro 模式将上传到 kafka 模式注册表。 我在 github 存储库中有一个 json 文件,定义如下:

回答 1 投票 0

Spark 传入 JSON 流处理

我一直在尝试完成一个项目,其中我需要使用kafka将数据流发送到本地Spark来处理传入的数据。但是我无法显示和使用右侧的数据框...

回答 1 投票 0

如何更改 Kafka 作业的 Promtail 抓取间隔

我在 Kubernetes 集群中运行 Kafka,并使用 Promtail 将 Kafka 消息发送到 Loki。现在,从生成消息到 Loki 收到消息之间大约有 5 秒的延迟……

回答 1 投票 0

添加额外的代理节点后Kafka主题分区同步

我们在项目中使用 Confluence Platform 5.5 社区版,有 4 个 Broker 和 3 个 Zookeeper。 我们想在现有集群中再添加一个代理。添加代理后我们如何进行同步

回答 1 投票 0

无法使用Spring Cloud Kafka Binder处理消息

在下面的代码中,我尝试通过 REST 端点调用 processOrder() 来创建消息。然后,我想将 processOrder() 的结果传递给 processShipping() 和 processPayment。 然而...

回答 1 投票 0

AKS 中部署的微服务无法连接到 AKS 中部署的 kafka

我正在将 Spring 微服务项目部署到 AKS,其余服务工作正常,但是当我部署使用 Kafka 的服务时,在我使用 helm 单独部署 Kafka 后, 我重复了

回答 1 投票 0

Spring Cloud Stream Kafka Binder - 在批处理模式下使用 DLQ 时重试不起作用

我使用的是Spring Cloud版本2023.0.1(Spring Cloud Stream版本4.1.1),我以批处理模式编写了一个简单的kafka消费者来模拟错误场景。 @豆 消费者 我使用的是Spring Cloud Version 2023.0.1(Spring Cloud Stream版本4.1.1),并且我以批处理模式编写了一个简单的kafka消费者来模拟错误场景。 @Bean Consumer<Message<List<String>>> consumer1() { return message -> { final List<String> payload = message.getPayload(); final MessageHeaders messageHeaders = message.getHeaders(); payload.forEach(System.out::println); payload.forEach(p -> { if(p.startsWith("a")) { throw new RuntimeException("Intentional Exception"); } }); System.out.println(messageHeaders); System.out.println("Done"); }; } 我的application.yml文件看起来像这样 spring: cloud: function: definition: consumer1; stream: bindings: consumer1-in-0: destination: topic1 group: consumer1-in-0-v0.1 consumer: batch-mode: true use-native-decoding: true max-attempts: 3 kafka: binder: brokers: - localhost:9092 default: consumer: configuration: max.poll.records: 1000 max.partition.fetch.bytes: 31457280 fetch.max.wait.ms: 200 bindings: consumer1-in-0: consumer: enableDlq: true dlqName: dlq-topic dlqProducerProperties: configuration: value.serializer: org.apache.kafka.common.serialization.StringSerializer key.serializer: org.apache.kafka.common.serialization.StringSerializer configuration: key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: org.apache.kafka.common.serialization.StringDeserializer 我还指定了 ListenerContainerWithDlqAndRetryCustomizer 来自定义重试 @Bean ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) { return new ListenerContainerWithDlqAndRetryCustomizer() { @Override public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group, @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver, @Nullable BackOff backOff) { ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template, dlqDestinationResolver); container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff)); } @Override public boolean retryAndDlqInBinding(String destinationName, String group) { return false; } }; } 问题 当发生错误时,消息批直接进入DLQ。并且不会尝试重试。 但是问题是,可能会出现暂时性错误,导致批处理处理失败,我希望在将批处理发送到 DLQ 之前重试几次。但我无法让它工作。 我做错了什么? 万一将来有人偶然发现这个问题,我就知道出了什么问题。 我必须从 enableDlq 文件中删除 dlqName、dlqProducerProperties 和 application.yml。 然后就成功了。 在java代码中,我还删除了ListenerContainerWithDlqAndRetryCustomizer并只使用了ListenerContainerCustomizer。 代码看起来像这样: @Bean public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) { return (container, dest, group) -> container.setCommonErrorHandler(errorHandler); } @Bean public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4)); } @Bean public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate, KafkaOperations<?, ?> bytesTemplate, KafkaOperations<?, ?> longTemplate) { Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>(); templates.put(String.class, stringTemplate); templates.put(byte[].class, bytesTemplate); templates.put(Long.class, longTemplate); return new DeadLetterPublishingRecoverer(templates); } @Bean public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)); } @Bean public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf); } @Bean public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) { return new KafkaTemplate<>(pf, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)); }

回答 1 投票 0

Kafka Streams List Serdes:始终为空

我做错了什么? 我正在编写一个 ProcessorSupplier,用于将 n 条记录聚合为一条。为此,我正在使用 List Serdes ... 我的问题是 ArrayList 总是空的。 使用 Java 21 和 Ka...

回答 2 投票 0

Smallrye Kafka - 每个通道定义代理 - 不消耗任何事件

我在 Quarkus 应用程序中使用 Smallrye-Kafka 连接到 kafka。到目前为止,要求是“拥有一个对所有通道都有效的引导服务器”。配置...

回答 1 投票 0

创建许多空映射时导致 Java 内存泄漏

在从 Kafka 接收消息的过程中,当我使用 Map.of() 创建空映射时,可能会导致内存泄漏或更密集的 CPU 使用? 可以有多个侦听器,但是...

回答 1 投票 0

如何在 AWS devezium 中设置 LSN 编号以在 LSN 后启动事件日志

如何在 AWS devezium 中设置 LSN 编号以在 LSN 之后启动事件日志,就像我在表 Employee 中有 100 条记录,在备份后,我有这 100 条记录并恢复它,现在我开始...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.