我有一个简单的案例,我需要使用 Spring Cloud Stream 3.2.4 版本来实现。 我有一个预定的消息生产者:
public class MySourceOfMessagesImpl {
private Random random = new Random();
private StreamBridge streamBridge;
public MySourceOfMessagesImpl(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@Scheduled(fixedDelay = 1000)
public void get() {
String payload = "my payload: " + random.nextInt();
String payload2 = "my payload 2: " + random.nextInt();
List<Message<String>> message = List.of(MessageBuilder.createMessage(payload, new MessageHeaders(Map.of())), MessageBuilder.createMessage(payload2, new MessageHeaders(Map.of())));
streamBridge.send("myOutDestination", message);
}
}
和消费者:
public class MyTransformingProcessor implements Consumer<Message<String>> {
private final StreamBridge streamBridge;
public MyTransformingProcessor(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@ServiceActivator(inputChannel = "myOutDestination")
public void accept(Message<String> source) {
//some transformations here
streamBridge.send("mykafkatopic-out-0", source);
}
}
问题是,MyTransformingProcessor 正在获取 ArrayList<> 而不是单个消息。有没有办法指示框架将消息列表作为单独的项目发送?
消息传递中的工作单元确实是一个
Message
,而不是消息列表。
因此,如果您有它们的列表,则必须迭代并对单个项目使用
streamBridge.send()
。
不清楚为什么您需要额外的
Consumer
,而您只需在预定方法中使用streamBridge.send("mykafkatopic-out-0")
即可。