我对自己拥有的spring cloud stream有几点要求:
我已经研究过使用一个函数,但我无法解决如何在给定一个主题的情况下发送多条消息,我也研究过使用消费者和供应商,但我看不到它工作得很好。我目前发送消息的方式是使用消费者,然后使用 StreamBridge 通过副作用发送。
@Bean
@SuppressWarnings("unchecked")
public Consumer<KStream<String, String>> generateMessage() {
return messages -> {
final Map<String, KStream<String, String>> splitMessages =
branchOutput(filterMessages(messages));
KStream<String, MessageData>[] ksArray = splitMessages
.values()
.stream()
.map(message ->
message.mapValues((key, jsonMessage) -> {
try {
return new MessageData(dataTransformService
.transformMessage(key, jsonMessage, extractTopic(jsonMessage)),
removeTopic(jsonMessage), "");
} catch (ClassNotFoundException e) {
return new MessageData(Collections.singletonList(CLASS_NOT_FOUND_EXCEPTION),
removeTopic(jsonMessage), e.getMessage());
}
}))
.toArray(KStream[]::new);
ksArray[0].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
OUTPUT_BINDING_1, value.getOriginalMessage(), value.getError()));
ksArray[1].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
OUTPUT_BINDING_2, value.getOriginalMessage(), value.getError()));
ksArray[2].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
OUTPUT_BINDING_3, value.getOriginalMessage(), value.getError()));
ksArray[3].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
OUTPUT_BINDING_4, value.getOriginalMessage(), value.getError()));
};
}
// send message(s) to topic or forward to dlq if there is a message handling exception
private void sendMessage(String key, List<String> transformedMessages, String binding, String originalMessage, String error) {
try {
for (String transformedMessage : transformedMessages) {
if (!transformedMessage.equals(CLASS_NOT_FOUND_EXCEPTION)) {
boolean sendTest = streamBridge.send(binding,
new GenericMessage<>(transformedMessage, Collections.singletonMap(
KafkaHeaders.KEY, (extractMessageId(transformedMessage)).getBytes())));
log.debug(String.format("message sent = %s", sendTest));
} else {
log.warn(String.format("message transform error: %s", error));
streamBridge.send(DLQ_OUTPUT_BINDING,
new GenericMessage<>(originalMessage, Collections.singletonMap(KafkaHeaders.KEY,
key.getBytes())));
}
}
} catch (MessageHandlingException e) {
log.warn(String.format("message send error: %s", e));
streamBridge.send(DLQ_OUTPUT_BINDING,
new GenericMessage<>(originalMessage, Collections.singletonMap(KafkaHeaders.KEY,
key.getBytes())));
}
}
我真正需要知道的是是否有更好的方法来执行这些要求? 如果没有,有没有办法检查我们发送到的外部 kafka 集群(我不管理它)的确认,以便如果没有收到消息可以重新发送?
Kafka Streams 不允许您从一个集群接收记录并在处理后将它们发布到不同的集群。单个拓扑中的所有处理都必须在同一个集群上完成。请参阅相关的 Stack Overflow thread。根据您的用例要求,解决此限制的方法是使用
StreamBridge,
KafkaTemplate
等手动将记录发送到第二个集群。虽然这并不完美,但在这种情况下这是一个可以接受的解决方案.但是,使用这种方法,您将失去 Kafka Streams 提供的任何端到端保证。例如,当您在同一个集群上运行整个拓扑时,Kafka Streams 会为您提供某些处理保证,例如恰好一次、至少一次等。如果您想保留那些 Kafka Streams 提供的保证,您可以使用以下策略:您愿意在第一个集群上使用另一个额外的主题。这是基本的想法。
public Function<KStream<String, String>, KStream<...>> generateMessage()
所以上面是一个在同一个集群上运行的端到端的 Kafka Streams 处理器。您将结果生成到集群上的出站主题中。然后,您使用常规的基于消息通道的 Kafka 活页夹 -
spring-cloud-stream-binder-kafka
将消息发送到第二个集群。
Function<String, String> passThroughToSecondCluster() {
}
您可以利用 Spring Cloud Stream 的多绑定器功能,在入站上使用第一个集群,在出站上使用第二个集群。这是一个example。查看configuration以获取更多详细信息。
通过这种方式,您可以获得 Kafka Streams 的端到端保证,然后通过一个单独的处理器,将记录发送到第二个集群。缺点,显然是你需要在第一个集群上有一个额外的主题。