Apache Kafka是一个分布式流媒体平台,用于存储和处理高吞吐量数据流。
我正在使用kafka-python。我试图在 kafka_consumer 中定义消息以在后台运行异步。目前,在我看来,它是同步运行的,从而阻止了其他方法。 kafka-pyt 吗...
带有 Kafka 的 Filebeat 不会将缺少冒号的消息插入到中央日志记录中
我正在使用一个写入日志文件的应用程序。 Filebeat 使用 Kafka 模块将日志数据发送到中央日志服务器。我正在通过将
我正在为 Kafka 创建一个非常基本的消费者/生产者设置。我正在使用 Zookeper 和 Kafka 以及 Docker compose。两者似乎都有联系,但当涉及到生产或消费时,没有什么……
将 strimzi kafka 集群从 0.36.1 升级到 0.40.0 后,普通监听器的主题授权失败
将strimzi kafka集群从0.36.1升级到0.40.0后。普通侦听器无法从端口 9092 连接到 kafka。它会抛出异常 在消费者、生产者中: “无法连接未授权访问...
当您没有 Java POJO 时,反序列化 avro 消息的预期结果是什么?
假设我有许多 avro 架构定义:(i) Event1、(ii) Event2、(iii) EventWrapper。 EventWrapper 是一个具有一个字段(称为有效负载)的记录,该字段是 Event1、Event2 的并集。 我也...
据我了解,kafka 在写入数据时使用“write-behind”技术。这意味着它将首先将数据写入页缓存(作为脏页),并最终刷新这些脏页
我相信我遇到了一个似乎无法解决的配置问题。目前我有一个容器化的 Kafka 代理,我想在本地连接到它。然而,当我尝试连接时
连接kafka和cassandra时出现NoHostAvailableException
org.apache.kafka.connect.errors.ConnectException:无法连接到 Cassandra。 在io.lenses.streamreactor.connect.cassandra.source.CassandraSourceTask.start(CassandraSourceTask.scala:86) ...
我正在为 kafka 主题定义一个带有 Enum 字段的 avro 模式。 avro 模式将上传到 kafka 模式注册表。 我在 github 存储库中有一个 json 文件,定义如下:
我正在为 kafka 主题定义一个带有 Enum 字段的 avro 模式。 avro 模式将上传到 kafka 模式注册表。 我在 github 存储库中有一个 json 文件,定义如下:
我一直在尝试完成一个项目,其中我需要使用kafka将数据流发送到本地Spark来处理传入的数据。但是我无法显示和使用右侧的数据框...
我在 Kubernetes 集群中运行 Kafka,并使用 Promtail 将 Kafka 消息发送到 Loki。现在,从生成消息到 Loki 收到消息之间大约有 5 秒的延迟……
我们在项目中使用 Confluence Platform 5.5 社区版,有 4 个 Broker 和 3 个 Zookeeper。 我们想在现有集群中再添加一个代理。添加代理后我们如何进行同步
无法使用Spring Cloud Kafka Binder处理消息
在下面的代码中,我尝试通过 REST 端点调用 processOrder() 来创建消息。然后,我想将 processOrder() 的结果传递给 processShipping() 和 processPayment。 然而...
AKS 中部署的微服务无法连接到 AKS 中部署的 kafka
我正在将 Spring 微服务项目部署到 AKS,其余服务工作正常,但是当我部署使用 Kafka 的服务时,在我使用 helm 单独部署 Kafka 后, 我重复了
Spring Cloud Stream Kafka Binder - 在批处理模式下使用 DLQ 时重试不起作用
我使用的是Spring Cloud版本2023.0.1(Spring Cloud Stream版本4.1.1),我以批处理模式编写了一个简单的kafka消费者来模拟错误场景。 @豆 消费者 我使用的是Spring Cloud Version 2023.0.1(Spring Cloud Stream版本4.1.1),并且我以批处理模式编写了一个简单的kafka消费者来模拟错误场景。 @Bean Consumer<Message<List<String>>> consumer1() { return message -> { final List<String> payload = message.getPayload(); final MessageHeaders messageHeaders = message.getHeaders(); payload.forEach(System.out::println); payload.forEach(p -> { if(p.startsWith("a")) { throw new RuntimeException("Intentional Exception"); } }); System.out.println(messageHeaders); System.out.println("Done"); }; } 我的application.yml文件看起来像这样 spring: cloud: function: definition: consumer1; stream: bindings: consumer1-in-0: destination: topic1 group: consumer1-in-0-v0.1 consumer: batch-mode: true use-native-decoding: true max-attempts: 3 kafka: binder: brokers: - localhost:9092 default: consumer: configuration: max.poll.records: 1000 max.partition.fetch.bytes: 31457280 fetch.max.wait.ms: 200 bindings: consumer1-in-0: consumer: enableDlq: true dlqName: dlq-topic dlqProducerProperties: configuration: value.serializer: org.apache.kafka.common.serialization.StringSerializer key.serializer: org.apache.kafka.common.serialization.StringSerializer configuration: key.deserializer: org.apache.kafka.common.serialization.StringDeserializer value.deserializer: org.apache.kafka.common.serialization.StringDeserializer 我还指定了 ListenerContainerWithDlqAndRetryCustomizer 来自定义重试 @Bean ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) { return new ListenerContainerWithDlqAndRetryCustomizer() { @Override public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group, @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver, @Nullable BackOff backOff) { ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template, dlqDestinationResolver); container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff)); } @Override public boolean retryAndDlqInBinding(String destinationName, String group) { return false; } }; } 问题 当发生错误时,消息批直接进入DLQ。并且不会尝试重试。 但是问题是,可能会出现暂时性错误,导致批处理处理失败,我希望在将批处理发送到 DLQ 之前重试几次。但我无法让它工作。 我做错了什么? 万一将来有人偶然发现这个问题,我就知道出了什么问题。 我必须从 enableDlq 文件中删除 dlqName、dlqProducerProperties 和 application.yml。 然后就成功了。 在java代码中,我还删除了ListenerContainerWithDlqAndRetryCustomizer并只使用了ListenerContainerCustomizer。 代码看起来像这样: @Bean public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) { return (container, dest, group) -> container.setCommonErrorHandler(errorHandler); } @Bean public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) { return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4)); } @Bean public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate, KafkaOperations<?, ?> bytesTemplate, KafkaOperations<?, ?> longTemplate) { Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>(); templates.put(String.class, stringTemplate); templates.put(byte[].class, bytesTemplate); templates.put(Long.class, longTemplate); return new DeadLetterPublishingRecoverer(templates); } @Bean public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)); } @Bean public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate<>(pf); } @Bean public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) { return new KafkaTemplate<>(pf, Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)); }
Kafka Streams List Serdes:始终为空
我做错了什么? 我正在编写一个 ProcessorSupplier,用于将 n 条记录聚合为一条。为此,我正在使用 List Serdes ... 我的问题是 ArrayList 总是空的。 使用 Java 21 和 Ka...
Smallrye Kafka - 每个通道定义代理 - 不消耗任何事件
我在 Quarkus 应用程序中使用 Smallrye-Kafka 连接到 kafka。到目前为止,要求是“拥有一个对所有通道都有效的引导服务器”。配置...
在从 Kafka 接收消息的过程中,当我使用 Map.of() 创建空映射时,可能会导致内存泄漏或更密集的 CPU 使用? 可以有多个侦听器,但是...
如何在 AWS devezium 中设置 LSN 编号以在 LSN 后启动事件日志
如何在 AWS devezium 中设置 LSN 编号以在 LSN 之后启动事件日志,就像我在表 Employee 中有 100 条记录,在备份后,我有这 100 条记录并恢复它,现在我开始...