Spring云流产生消息数组,一次消费一条消息

问题描述 投票:0回答:1

我有一个简单的案例,我需要使用 Spring Cloud Stream 3.2.4 版本来实现。 我有一个预定的消息生产者:

public class MySourceOfMessagesImpl {

    private Random random = new Random();
    private StreamBridge streamBridge;

    public MySourceOfMessagesImpl(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }
    
    @Scheduled(fixedDelay = 1000)
    public void get() {
        String payload = "my payload: " + random.nextInt();
        String payload2 = "my payload 2: " + random.nextInt();
        List<Message<String>> message = List.of(MessageBuilder.createMessage(payload, new MessageHeaders(Map.of())), MessageBuilder.createMessage(payload2, new MessageHeaders(Map.of())));
        streamBridge.send("myOutDestination", message);
    }
}

和消费者:

public class MyTransformingProcessor implements Consumer<Message<String>> {
    private final StreamBridge streamBridge;

    public MyTransformingProcessor(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }

    @ServiceActivator(inputChannel = "myOutDestination")
    public void accept(Message<String> source) {
        //some transformations here
        streamBridge.send("mykafkatopic-out-0", source);
    }
}

问题是,MyTransformingProcessor 正在获取 ArrayList<> 而不是单个消息。有没有办法指示框架将消息列表作为单独的项目发送?

spring-boot spring-integration spring-cloud-stream
1个回答
0
投票

消息传递中的工作单元确实是一个

Message
,而不是消息列表。

因此,如果您有它们的列表,则必须迭代并对单个项目使用

streamBridge.send()

不清楚为什么您需要额外的

Consumer
,而您只需在预定方法中使用
streamBridge.send("mykafkatopic-out-0")
即可。

© www.soinside.com 2019 - 2024. All rights reserved.