编辑 Kafka Listener Spring App 以更改 Stage/Target

问题描述 投票:0回答:1

我可以利用另一个运行 Kafka 应用程序/代码库的团队来使用相同的数据/将其加载到我们的新暂存表中,而不是他们的。他们在“消息”文件夹中有许多不同的 kafka 侦听器适配器 .java 文件,每个文件使用不同类型的数据。

每个 kafka listener 都有很多导入,一个公共类,实际插入数据的 try 语句等。我需要为每个 kafka listener adapter.java 文件编辑什么以将其他团队的阶段/目标表更改为我的?我发现我只需要更改“groupid”和“topic”变量+(查找具有此格式的文件以配置/添加新的groupid/id:spring.kafka.consumer.group-id=new-groupid spring.kafka.listener .id=新 ID)。这就是 .java 文件中两个变量的使用方式:

   @KafkaListener(topics = "#{new java.util.HashMap(${dataflow.consumer.props.assets.crypto}).keySet()}", groupId = "${dataflow.consumer.assets.crypto.group}",containerFactory = "cryptoListenerContainerFactory", id = "${dataflow.consumer.assets.crypto.group}")

每个 kafka 侦听器适配器都有一个存储库变量,所以我不确定。似乎数据实际上被插入到这个引用“存储库”的代码块中:

   try {
        List<FundEntity> cryptoFilteredList = cryptoConsumerRecordList.stream()
                .filter(cryptoConsumerRecord -> recordIsNotFilterable(cryptoConsumerRecord, FundEntity.class))
                .map(ConsumerRecord::value)
                .collect(Collectors.toList());

        if (!cryptoFilteredList.isEmpty()) {

            this.Repository.storecryptoCollection(cryptoFilteredList, topic, firstOffset, partition, cryptoGroupId);

            kafkaDetailsSummary.setStatus(KafkaMessageSummary.KafkaStatus.PROCESSED);
            log.info(LogHelper.generateCryptoReconMessage(cryptoFilteredList), keyValue(KAFKA_DETAILS, kafkaDetailsSummary));
        }
apache-kafka kafka-consumer-api spring-kafka
1个回答
0
投票

spring.kafka.listener.id
:没有这个属性,
spring.kafka.consumer.group-id
只有在注解上没有
groupId
属性的情况下才使用

由于他们在注释中使用属性占位符,您只需将

dataflow.consumer.assets.crypto.group
属性设置为您想要的值即可。

© www.soinside.com 2019 - 2024. All rights reserved.