我有一个带有 2 个分区的 Kafka 主题。我创建了一个带有 2 个不同键的消息处理程序。
@Bean
@ServiceActivator(inputChannel = "pushDataRequestChannel")
public MessageHandler pushDataRequestHandler(final KafkaTemplate<String, String> kafkaTemplate) {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setMessageKeyExpression(new LiteralExpression(topicProperties.getProducerKey()));
return handler;
}
@Bean
public DirectChannel processDataRequestChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "processDataRequestChannel")
public MessageHandler processDataRequestHandler(final KafkaTemplate<String, String> kafkaTemplate) {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setMessageKeyExpression(new LiteralExpression(topicProperties.getProducerKeyOne()));
return handler;
}
所以。我将向分区 0 发送消息,仅限奇数。对于事件编号,它应该转到分区 1。
这是我的逻辑
for(int i=0; i < 6 ; i++)
{
Predicate<Integer> isEven = n -> n % 2 == 0;
// Check if the number is even or odd and display the result
if (isEven.test(i)) {
System.out.println(i + " is an even number.");
MessageNeedToSend message = new MessageNeedToSend();
----
---
---
pushDataRequestChannel.send(MessageBuilder
.withPayload(objectMapper.writeValueAsString(message))
.copyHeaders(headers)
.build());
} else {
MessageNeedToSend message = new MessageNeedToSend();
----
---
---
processDataRequestChannel.send(MessageBuilder
.withPayload(objectMapper.writeValueAsString(message))
.copyHeaders(headers)
.build());
System.out.println(i + " is an odd number.");
}
}
但是总是会分区0。
partition_1_key - 706172746974696F6E5F315F6B6579(字节数组) - 1550936367(杂音哈希2) - 1550936367 % 2 = 1 - (根据我的理解,它应该进入分区1)
group_id - 67726F75705F6964(字节数组)-- 2186850892(杂音哈希2)- 2186850892 % 2 = 0 - (根据我的理解,它应该进入分区0)
但它总是会分区0;
你能告诉我可能是什么原因吗
ProducerRecord 允许您指定分区号,而不是使用两个处理程序。 使用 spring boot KafkaProducerMessageHandler ,您可以通过使用
partition id
指定 setPartitionIdExpression
来将消息发送到主题,而不是使用 key 来计算分区 id。
谢谢