从 Kafka 到 Rabbit 的 Spring Cloud Stream

问题描述 投票:0回答:1

我正在尝试使用Kafka消息密钥作为rabbitmq消息的路由密钥将消息从kafka的主题发送到多个rabbitMQ队列。 管道如下所示:kafka的主题 -> Spring Cloud Stream应用程序 ->rabbitmq

RabbitMQ 设置

兑换方式:直接兑换。所有队列都绑定到Exchange,路由键和队列名称相同。

SCS 配置

  cloud:
    stream:
      default-binder: rabbit
      function:
        definition: upstreamProcessor
      bindings:
        upstreamProcessor-in-0:
          destination: here is the kafka topic
          group: ms-upstream-provider
          binder: kafka
        upstreamProcessor-out-0:
          destination: here is the rabbitMQ exchange name
          binder: rabbit
      rabbit.bindings.upstreamProcessor-out-0.producer:
        exchangeType: direct

使用upstreamProcessor函数我正在读取传入消息,创建新消息并发送到rabbitMQ。

但我不明白如何为每个传出消息添加路由密钥?我有很多不同的队列(> 6000),我想在代码中而不是在设置中设置路由键 在文档和网站上,我没有找到如何以编程方式设置路由键

预先感谢您的建议。

public Function<Message<String>, Message<JsonNode>> upstreamProcessor() {
    return message -> {
        try {
            JsonNode node = mapper.readTree(message.getPayload());
            String routingKey= message.getHeaders().get(KafkaHeaders.RECEIVED_KEY).toString();
            return MessageBuilder.withPayload(node)
                    .setHeader("routingKey", routingKey)
                    .build();
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    };
}
apache-kafka rabbitmq spring-cloud-stream
1个回答
0
投票

您可以按照您所做的那样在代码中设置路由键,即

.setHeader("routingKey," routingKey).
然后您需要通过配置在生产者绑定上设置
routingKeyExpression
spring.cloud.stream.rabbit.bindings.upstreamProcessor-out-0.producer.routing-key-expression=headers['routingKey']

这样,通过

upstreamProcessor-out-0
绑定进行的任何出站发布都将参考
routingKey
标头作为 AMQP 出站端点使用的路由键。请注意,标头名称可以是任何内容 - 不一定是
routingKey

© www.soinside.com 2019 - 2024. All rights reserved.