Apache Beam 写入 Kafka 时的错误处理

问题描述 投票:0回答:1

通过KafkaIO发送到Kafka时如何正确捕获异常?

KafkaIO.<String, String>write()
                  .withBootstrapServers(kafkaBroker)
                  .withTopic(topic)
                  .withKeySerializer(StringSerializer.class)
                  .withValueSerializer(StringSerializer.class)
                  .withProducerConfigUpdates(kafkaProperties)

我有带有 Failure 对象的 TupleCollection 用于存储异常,但如果未发送消息,我需要重试发送消息(或将此消息写入任何其他源 - 数据库、文件等)。我只需要捕获错误,但是如何使用 KafkaIO.write() 来做到这一点?

apache-kafka error-handling apache-beam apache-beam-kafkaio
1个回答
0
投票

目前Beam中还没有办法捕获通过KafkaIO写入Kafka时抛出的异常。我目前正在设计支持此功能,但短期内不会可用。

Dataflow 运行程序将自动重试失败的消息(批处理模式下重试 4 次,流式传输模式下无限次)。 (https://cloud.google.com/dataflow/docs/guides/common-errors

© www.soinside.com 2019 - 2024. All rights reserved.