将自定义标题添加到Spring Cloud Stream(使用Spring Reactor)

问题描述 投票:0回答:1

作为Spring Reactor的新手,我正在尝试使用Spring云流(使用rabbitMQ)来流数据。在将消息发送到队列之前,我需要添加一些自定义标头。

我的spring-cloud-stream的配置是:

spring:
  cloud:
    stream:
      default:
        producer:
          errorChannelEnabled: true
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

      binders:
        rabbitInput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5672
                host: localhost

        rabbitOutput:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                port: 5670
                host: localhost 

生产商参考:

@SpringBootApplication
@EnableBinding(Processor.class)
public class MessageProcessor {

    public static void main(String[] args) {
        SpringApplication.run(MessageProcessor.class, args);
    }

    @Bean
    Function<Flux<String>, Flux<String>> processMessage(List<String> students) {
        return data -> data.map(d -> match(d, students));

    }
    private String match(String message, List<String> students){
        return Objects.isNull(message) || message.isBlank()
            ? message
            : String.valueOf(matchStudentName(message, students));
    }

    private Optional<String> matchStudentName(String message, List<String> students){
        return students.stream()
        .filter(name -> name.equals(message)).findFirst();
    }
    @Bean
    Function<Flux<String>, Flux<Message<String>>> addHeaders() {
        return data-> data.map(d-> MessageBuilder
            .withPayload( d )
            .setHeader("a", 1)
            .setHeader("b", "999")
            .build());
    }
}

标头已成功添加到消息中,但是它在某个地方被覆盖并且没有传播到消费者。

有人可以分享他们对如何使用Spring Cloud Stream向消息中添加自定义标头的想法。

提前感谢!

spring-boot spring-integration spring-cloud-stream spring-rabbitmq spring-reactor
1个回答
0
投票

[请升级到Hoxton.SR2,它将带来spring-cloud-stream 3.0.2.RELEASE。有一些更新,但是简而言之,您正在生成的消息及其中的标头应保留。

旁注:同样,由于增加了对多个输入/输出函数参数的支持,我们不得不更新函数的绑定名称约定。您可以阅读有关here的更多信息,但对您而言,这意味着您的配置需要快速更新,因为默认情况下不再使用inputoutput,因此您应该使用从函数名派生的名称

spring:
  cloud:
    stream:
      bindings:
        processMessageaddHeaders-in-0:
          binder: rabbitInput
          destination: inputDestination
        processMessageaddHeaders-out-0:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders

。 。 。或者您可以将派生的绑定名称映射到更具描述性的名称(例如inputoutput等),并改用该名称

spring:
  cloud:
    stream:
      bindings:
        input:
          binder: rabbitInput
          destination: inputDestination
        output:
          binder: rabbitOutput
          destination: outputDestination
      function:
        definition: processMessage|addHeaders
        bindings: 
          processMessageaddHeaders-in-0: input  
          processMessageaddHeaders-out-0: output


© www.soinside.com 2019 - 2024. All rights reserved.