在将消息发送到Spring Cloud Streams中的DLQ之前添加自定义信息

问题描述 投票:0回答:1

我仅通过使用属性来使用Spring Cloud Streams和默认的Spring Retry机制。它运行良好,可以重试消息,然后转到DLQ。到目前为止一切都很顺利。现在出现了问题...

我需要在消息中添加一些自定义信息,然后再将其从我的服务发送到DLQ。它们足够简单,可以帮助我识别失败的消息,而无需接触通用的有效负载。

可能我可以添加自定义标头或将其包装在已知模型中,在该模型中我可以检索所需的信息-无论是哪种方式,我都需要截取/修改消息。

最简单的方法是什么,而又不花多少钱?我的意思是,我们只使用简单的配置来进行重试,所以“成本”是指将配置与其他内容交换。还是谢谢!

spring spring-cloud-stream spring-retry
1个回答
0
投票

使用Kafka活页夹,您可以将ProducerInterceptor添加到kafka生产者配置interceptor.classes

/**
 * This is called from {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
 * {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} methods, before key and value
 * get serialized and partition is assigned (if partition is not specified in ProducerRecord).
 * <p>
 * This method is allowed to modify the record, in which case, the new record will be returned. The implication of modifying
 * key/value is that partition assignment (if not specified in ProducerRecord) will be done based on modified key/value,
 * not key/value from the client. Consequently, key and value transformation done in onSend() needs to be consistent:
 * same key and value should mutate to the same (modified) key and value. Otherwise, log compaction would not work
 * as expected.
 * <p>
 * Similarly, it is up to interceptor implementation to ensure that correct topic/partition is returned in ProducerRecord.
 * Most often, it should be the same topic/partition from 'record'.
 * <p>
 * Any exception thrown by this method will be caught by the caller and logged, but not propagated further.
 * <p>
 * Since the producer may run multiple interceptors, a particular interceptor's onSend() callback will be called in the order
 * specified by {@link org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. The first interceptor
 * in the list gets the record passed from the client, the following interceptor will be passed the record returned by the
 * previous interceptor, and so on. Since interceptors are allowed to modify records, interceptors may potentially get
 * the record already modified by other interceptors. However, building a pipeline of mutable interceptors that depend on the output
 * of the previous interceptor is discouraged, because of potential side-effects caused by interceptors potentially failing to
 * modify the record and throwing an exception. If one of the interceptors in the list throws an exception from onSend(), the exception
 * is caught, logged, and the next interceptor is called with the record returned by the last successful interceptor in the list,
 * or otherwise the client.
 *
 * @param record the record from client or the record returned by the previous interceptor in the chain of interceptors.
 * @return producer record to send to topic/partition
 */
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

生产者记录包含目标主题名称;您可以在此处添加/删除标题。

目前对于RabbitMQ活页夹没有类似的钩子。如果您使用的是活页夹,请根据活页夹在GitHub上打开新功能。

© www.soinside.com 2019 - 2024. All rights reserved.