如何将主题动态传递给kafka监听器?

问题描述 投票:0回答:6

几天以来,我一直在尝试将主题动态传递给 Kafka 监听器的方法,而不是通过 Java DSL 中的键来使用它们。周围有人以前做过这个,或者可以阐明实现这一目标的最佳方法是什么?

java apache-kafka kafka-consumer-api
6个回答
15
投票

我发现的最简单的解决方案是使用 SpEL:

@Autowired
private SomeBean kafkaTopicNameProvider;

@KafkaListener(topics = "#{kafkaTopicNameProvider.provideName()}")
public void listener() { ... }

3
投票

你不能“动态地将主题传递给Kafka监听器”;您必须以编程方式创建一个侦听器容器。


3
投票

这是一个可行的解决方案:

// Start brokers without using the "@KafkaListener" annotation
Map<String, Object> consumerProps = consumerProps("my-srv1:9092", "my-group", "false");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties("my-topic");
KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<String, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) record -> {
    log.error("Message received: " + record);
    records.add(record);
});
container.start();

/**
 * Set up test properties for an {@code <Integer, String>} consumer.
 * @param brokersCommaSep the bootstrapServers property (comma separated servers).
 * @param group the group id.
 * @param autoCommit the auto commit.
 * @return the properties.
 */
public static Map<String, Object> consumerProps(String brokersCommaSep, String group, String autoCommit) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersCommaSep);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return props;
}

希望能帮到你。


3
投票

我制作了kafka监听器用于运行时注册、取消注册、启动、停止。

public class KafkaListener {

     private final KafkaListenerContainerFactory kafkaListenerContainerFactory;
     private final Map<String, MessageListenerContainer> registeredTopicMap;

/** Kafka listener registration at runtime.**/
public void register(final Supplier<Set<String>> topicSupplier, final Supplier<MessageListener> messageListenerSupplier) {

    synchronized (lock) {
        final Set<String> registeredTopics = getRegisteredTopics();
        final Set<String> topics = topicSupplier.get();

        if (topics.isEmpty()) {
            return;
        }

        topics.stream()
                .filter(topic -> !registeredTopics.contains(topic))
                .forEach(topic -> doRegister(topic, messageListenerSupplier.get()));
    }
}


private void doRegister(final String topic, final MessageListener messageListener) {
    final MessageListenerContainer messageListenerContainer = kafkaListenerContainerFactory.createContainer(topic);
    messageListenerContainer.setupMessageListener(messageListener);
    messageListenerContainer.start();

    registeredTopicMap.put(topic, messageListenerContainer);
}

完整源代码 :https://github.com/pkgonan/kafka-listener

首先,尝试一下。

docker-compose up -d

然后。调用api。

curl -XPOST /consumers/order/register .....

curl -XPOST /consumers/order/de-register .....

curl -XPOST /consumers/order/stop

curl -XPOST /consumers/order/start

1
投票

您可以在运行时动态更改主题!!!!

@Component
public class StoppingErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        ConcurrentMessageListenerContainer listenerContainer = (ConcurrentMessageListenerContainer)kafkaListenerEndpointRegistry.getListenerContainer("fence");
        ContainerProperties cp=listenerContainer.getContainerProperties();
        String[] topics =cp.getTopics();
        topics[0]="gaonb";
        listenerContainer.stop();
        listenerContainer.start();
    }
}

0
投票

如果有人想要构建一个可用作库的模块,请查看https://github.com/313hemant313/TheGameKafka

根据clientName我们可以得到KafkaProducer和KafkaConsumer,示例用法:

TheGameKafkaProducer<String, String> theGameKafkaProducer = theGameKafkaProducerFactory.getTheGameKafkaProducer(
    "testClient");
theGameKafkaProducer.send("this is a test msg from TheGameKafkaProducer");

TheGameKafkaConsumer<String, String> theGameKafkaConsumer = theGameKafkaConsumerFactory.getTheGameKafkaConsumer(
    "testClient");
theGameKafkaConsumer.listen(messageListener());

private MessageListener<String, String> messageListener() {
  return rec -> log.info("TheGameKafkaConsumer listened : {}", rec);
}

示例配置:

the-game-kafka:
  clientConsumerProperties:
    - clientName: "testClient"
      topic: "testTopic"
      enabled: true
      kafkaProperties:
        consumer:
          bootstrapServers: localhost:9094
          groupId: "tgk-group"
          autoOffset: earliest
          keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
          properties:
            spring:
              json:
                trusted:
                  packages: "*"
              deserializer:
                value:
                  delegate:
                    class: org.apache.kafka.common.serialization.StringDeserializer
  clientProducerProperties:
    - clientName: "testClient"
      topic: "testTopic"
      enabled: true
      kafkaProperties:
        producer:
          bootstrapServers: localhost:9094

灵感来自 https://github.com/pkgonan/kafka-listener

© www.soinside.com 2019 - 2024. All rights reserved.