Spring 将 Java DSL 与 Kafka 集成

问题描述 投票:0回答:1

我正在用kafka spring集成Java DSL做一个poc 我正在从数据库(DB)读取一行并将该行作为消息发送到 Kafka 主题。请找到下面的代码。 代码正在编译,我可以从数据库中获取记录,但我没有在主题中看到任何消息。

@Configuration
public class KafkaProduceConfig {
 
 @Bean
    public IntegrationFlow pollingAdapterFlow(EntityManagerFactory entityManagerFactory, MyTransformer transformer) {
        return IntegrationFlow
                .from(Jpa.inboundAdapter(entityManagerFactory).entityClass(MyRecord.class),                 
                        e -> e.poller(p -> p.cron("*/1 * * * * *").maxMessagesPerPoll(1).transactional())
                                  .autoStartup(true))
                .log(message -> "Polled DB Records from KafkaProduceConfig : " + message.getPayload())
                .split()
                .log(message -> "Record after split : " + message.getPayload())
                .enrichHeaders(hrdSpec ->hrdSpec.headerExpression("myRecord", "payload",true))
                .transform(transformer,"getCustomeRecord")
                .enrichHeaders(hrdSpec ->hrdSpec.headerExpression("customeRecord","payload",true))
                .log(message -> "Transformed Record : " + message.getPayload() +",topic :" +message.getHeaders().get("topic"))
                .channel("sendToKafka")
                .get();
    }


    @Bean
    public IntegrationFlow outboundChannelAdapterFlow() {
        return IntegrationFlow.from("sendToKafka")
                .log(message -> "outboundChannelAdapterFlow received payload : " + message.getPayload() +",topic :"
                        +message.getHeaders().get("topic")+"key :"+message.getHeaders().get("key"))
                .handle(m->Kafka.outboundChannelAdapter(producerFactory()).topic(m.getHeaders().get("topic").toString())
                        .messageKey(m.getHeaders().get("key").toString())
                       // .headerMapper(mapper())
                        .partitionId((Integer) m.getHeaders().get("partitionId")))
                .get();

    }


    public ProducerFactory<Integer, String> producerFactory() {
    
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        return new DefaultKafkaProducerFactory<>(props);
    }   
}

消息应发布到 Kafka 主题。

spring spring-boot spring-integration spring-kafka spring-integration-dsl
1个回答
0
投票

配置

.handle(m->Kafka.outboundChannelAdapter(producerFactory())
不正确。该 lambda 创建一个新的
MessageHandler
,其主体只是在新消息到达时使用该
Kafka
工厂。此代码只是不处理此消息。

您必须研究一个

handle()
变体,其中您提供的是工厂提供的
MessageHandler
,而不是 lambda 的新产品。

所以,像这样:

        .handle(Kafka.outboundChannelAdapter(producerFactory())
                    .topic(m -> m.getHeaders().get("topic").toString())
                    .messageKey(m -> m.getHeaders().get("key").toString())
               // .headerMapper(mapper())
                    .partitionId(m -> (Integer) m.getHeaders().get("partitionId")))

这样,

MessageHandler
将在配置阶段创建。在运行时,将根据请求消息调用其
handleMessage()
方法。所有这些选项现在都是在运行时调用的 lambda。

附注请编辑您的问题以获得更易读的代码片段。

© www.soinside.com 2019 - 2024. All rights reserved.