Spring Cloud StreamBridge to Kafka delivery check

问题描述 投票:0回答:1

我对自己拥有的spring cloud stream有几点要求:

  • 它需要从一个集群上的单个 Kafka 主题获取 KStream,并将消息发送到另一个集群上的多个主题。
  • 在某些情况下,它需要根据已收到的一条消息发送多条消息。
  • 这些消息都需要至少接收一次。

我已经研究过使用一个函数,但我无法解决如何在给定一个主题的情况下发送多条消息,我也研究过使用消费者和供应商,但我看不到它工作得很好。我目前发送消息的方式是使用消费者,然后使用 StreamBridge 通过副作用发送。

    @Bean
    @SuppressWarnings("unchecked")
    public Consumer<KStream<String, String>> generateMessage() {
        return messages -> {
            final Map<String, KStream<String, String>> splitMessages =
                    branchOutput(filterMessages(messages));

            KStream<String, MessageData>[] ksArray = splitMessages
                    .values()
                    .stream()
                    .map(message ->
                            message.mapValues((key, jsonMessage) -> {
                                try {
                                    return new MessageData(dataTransformService
                                            .transformMessage(key, jsonMessage, extractTopic(jsonMessage)),
                                            removeTopic(jsonMessage), "");
                                } catch (ClassNotFoundException e) {
                                    return new MessageData(Collections.singletonList(CLASS_NOT_FOUND_EXCEPTION),
                                            removeTopic(jsonMessage), e.getMessage());
                                }
                            }))
                    .toArray(KStream[]::new);

            ksArray[0].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
                    OUTPUT_BINDING_1, value.getOriginalMessage(), value.getError()));
            ksArray[1].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
                    OUTPUT_BINDING_2, value.getOriginalMessage(), value.getError()));
            ksArray[2].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
                    OUTPUT_BINDING_3, value.getOriginalMessage(), value.getError()));
            ksArray[3].peek((key, value) -> sendMessage(key, value.getTransformedMessages(),
                    OUTPUT_BINDING_4, value.getOriginalMessage(), value.getError()));
        };
    }

    // send message(s) to topic or forward to dlq if there is a message handling exception
    private void sendMessage(String key, List<String> transformedMessages, String binding, String originalMessage, String error) {
        try {
            for (String transformedMessage : transformedMessages) {
                if (!transformedMessage.equals(CLASS_NOT_FOUND_EXCEPTION)) {
                    boolean sendTest = streamBridge.send(binding,
                            new GenericMessage<>(transformedMessage, Collections.singletonMap(
                                    KafkaHeaders.KEY, (extractMessageId(transformedMessage)).getBytes())));

                    log.debug(String.format("message sent = %s", sendTest));

                } else {
                    log.warn(String.format("message transform error: %s", error));
                    streamBridge.send(DLQ_OUTPUT_BINDING,
                            new GenericMessage<>(originalMessage, Collections.singletonMap(KafkaHeaders.KEY,
                                    key.getBytes())));
                }
            }

        } catch (MessageHandlingException e) {
            log.warn(String.format("message send error: %s", e));
            streamBridge.send(DLQ_OUTPUT_BINDING,
                    new GenericMessage<>(originalMessage, Collections.singletonMap(KafkaHeaders.KEY,
                            key.getBytes())));

        }
    }

我真正需要知道的是是否有更好的方法来执行这些要求? 如果没有,有没有办法检查我们发送到的外部 kafka 集群(我不管理它)的确认,以便如果没有收到消息可以重新发送?

apache-kafka spring-kafka spring-cloud-stream spring-cloud-dataflow
1个回答
0
投票

Kafka Streams 不允许您从一个集群接收记录并在处理后将它们发布到不同的集群。单个拓扑中的所有处理都必须在同一个集群上完成。请参阅相关的 Stack Overflow thread。根据您的用例要求,解决此限制的方法是使用

StreamBridge,
KafkaTemplate
等手动将记录发送到第二个集群。虽然这并不完美,但在这种情况下这是一个可以接受的解决方案.但是,使用这种方法,您将失去 Kafka Streams 提供的任何端到端保证。例如,当您在同一个集群上运行整个拓扑时,Kafka Streams 会为您提供某些处理保证,例如恰好一次、至少一次等。如果您想保留那些 Kafka Streams 提供的保证,您可以使用以下策略:您愿意在第一个集群上使用另一个额外的主题。这是基本的想法。

public Function<KStream<String, String>, KStream<...>> generateMessage() 

所以上面是一个在同一个集群上运行的端到端的 Kafka Streams 处理器。您将结果生成到集群上的出站主题中。然后,您使用常规的基于消息通道的 Kafka 活页夹 -

spring-cloud-stream-binder-kafka
将消息发送到第二个集群。

Function<String, String> passThroughToSecondCluster() {
}

您可以利用 Spring Cloud Stream 的多绑定器功能,在入站上使用第一个集群,在出站上使用第二个集群。这是一个example。查看configuration以获取更多详细信息。

通过这种方式,您可以获得 Kafka Streams 的端到端保证,然后通过一个单独的处理器,将记录发送到第二个集群。缺点,显然是你需要在第一个集群上有一个额外的主题。

© www.soinside.com 2019 - 2024. All rights reserved.