通过KafkaIO发送到Kafka时如何正确捕获异常?
KafkaIO.<String, String>write()
.withBootstrapServers(kafkaBroker)
.withTopic(topic)
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
.withProducerConfigUpdates(kafkaProperties)
我有带有 Failure 对象的 TupleCollection 用于存储异常,但如果未发送消息,我需要重试发送消息(或将此消息写入任何其他源 - 数据库、文件等)。我只需要捕获错误,但是如何使用 KafkaIO.
目前Beam中还没有办法捕获通过KafkaIO写入Kafka时抛出的异常。我目前正在设计支持此功能,但短期内不会可用。
Dataflow 运行程序将自动重试失败的消息(批处理模式下重试 4 次,流式传输模式下无限次)。 (https://cloud.google.com/dataflow/docs/guides/common-errors)