所有生成的 kafka 消息始终发送到分区 0,即使密钥哈希值为 1

问题描述 投票:0回答:1

我有一个带有 2 个分区的 Kafka 主题。我创建了一个带有 2 个不同键的消息处理程序。

   @Bean
@ServiceActivator(inputChannel = "pushDataRequestChannel")
public MessageHandler pushDataRequestHandler(final KafkaTemplate<String, String> kafkaTemplate) {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setMessageKeyExpression(new LiteralExpression(topicProperties.getProducerKey()));
    return handler;
}

@Bean
public DirectChannel processDataRequestChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "processDataRequestChannel")
public MessageHandler processDataRequestHandler(final KafkaTemplate<String, String> kafkaTemplate) {
    KafkaProducerMessageHandler<String, String> handler =
            new KafkaProducerMessageHandler<>(kafkaTemplate);
    handler.setMessageKeyExpression(new LiteralExpression(topicProperties.getProducerKeyOne()));
    return handler;
}
  1. 组ID
  2. partition_1_key

所以。我将向分区 0 发送消息,仅限奇数。对于事件编号,它应该转到分区 1。

这是我的逻辑

for(int i=0; i < 6 ; i++)
        {
            Predicate<Integer> isEven = n -> n % 2 == 0;

            // Check if the number is even or odd and display the result
            if (isEven.test(i)) {
                System.out.println(i + " is an even number.");
                MessageNeedToSend message = new MessageNeedToSend();
                ----
                ---
                ---
                pushDataRequestChannel.send(MessageBuilder
                        .withPayload(objectMapper.writeValueAsString(message))
                        .copyHeaders(headers)
                        .build());
            } else {
                MessageNeedToSend message = new MessageNeedToSend();
                ----
                ---
                ---
                processDataRequestChannel.send(MessageBuilder
                        .withPayload(objectMapper.writeValueAsString(message))
                        .copyHeaders(headers)
                        .build());
                System.out.println(i + " is an odd number.");
            }
        }

但是总是会分区0。

partition_1_key - 706172746974696F6E5F315F6B6579(字节数组) - 1550936367(杂音哈希2) - 1550936367 % 2 = 1 - (根据我的理解,它应该进入分区1)

group_id - 67726F75705F6964(字节数组)-- 2186850892(杂音哈希2)- 2186850892 % 2 = 0 - (根据我的理解,它应该进入分区0)

但它总是会分区0;

你能告诉我可能是什么原因吗

apache-kafka kafka-producer-api
1个回答
0
投票

ProducerRecord 允许您指定分区号,而不是使用两个处理程序。 使用 spring boot KafkaProducerMessageHandler ,您可以通过使用

partition id
指定
setPartitionIdExpression
来将消息发送到主题,而不是使用 key 来计算分区 id。 谢谢

© www.soinside.com 2019 - 2024. All rights reserved.