加密的kafka消息似乎总是去同一个分区

问题描述 投票:1回答:2

我的节点应用程序使用kafka-node节点模块。

我有一个带有三个分区的kafka主题,如下所示:

Topic: NotifierTemporarye3df:/opPartitionCount: 3in$ kafReplicationFactor: 3ibe Configs: segment.bytes=1073741824 --topic NotifierTemporary
    Topic: NotifierTemporary        Partition: 0    Leader: 1001    Replicas: 1001,1003,1002        Isr: 1001,1003,1002
    Topic: NotifierTemporary        Partition: 1    Leader: 1002    Replicas: 1002,1001,1003        Isr: 1002,1001,1003
    Topic: NotifierTemporary        Partition: 2    Leader: 1003    Replicas: 1003,1002,1001        Isr: 1003,1002,1001

当我为主题编写一系列键式消息时,它们似乎都被写入相同的分区。我希望将某些不同的键控消息发送到分区1和2。

这是我的使用者onMessage事件函数的日志输出,其中包含几条消息:

the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":345,"partition":0,"highWaterOffset":346,"key":"66","timestamp":"2020-03-19T00:16:57.783Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":346,"partition":0,"highWaterOffset":347,"key":"222","timestamp":"2020-03-19T00:16:57.786Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":347,"partition":0,"highWaterOffset":348,"key":"13","timestamp":"2020-03-19T00:16:57.791Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":348,"partition":0,"highWaterOffset":349,"key":"316","timestamp":"2020-03-19T00:16:57.798Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":446,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":349,"partition":0,"highWaterOffset":350,"key":"446","timestamp":"2020-03-19T00:16:57.806Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":66,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":350,"partition":0,"highWaterOffset":351,"key":"66","timestamp":"2020-03-19T00:17:27.918Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":222,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":351,"partition":0,"highWaterOffset":352,"key":"222","timestamp":"2020-03-19T00:17:27.920Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":13,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":352,"partition":0,"highWaterOffset":353,"key":"13","timestamp":"2020-03-19T00:17:27.929Z"}
the message is: {"topic":"NotifierTemporary","value":"{\"recipient\":316,\"subject\":\"download complete\",\"message\":\"s3/123.jpg\"}","offset":353,"partition":0,"highWaterOffset":354,"key":"316","timestamp":"2020-03-19T00:17:27.936Z"}

这里是发送消息的kafka节点生产者代码:

  * @description Adds a notification message to the Kafka topic that is not saved in a database.
  * @param {Int} recipientId - accountId of recipient of notification message
  * @param {Object} message - message payload to send to recipient
  */
  async sendTemporaryNotification(recipientId, subject, message) {
    const notificationMessage = {
      recipient: recipientId,
      subject,
      message,
    };
    // we need to validate this message schema - this will throw if invalid
    Joi.assert(notificationMessage, NotificationMessage);
    // partition based on the recipient
    const payloads = [
      { topic: KAFKA_TOPIC_TEMPORARY, messages: JSON.stringify(notificationMessage), key: notificationMessage.recipient },
    ];
    if (this.isReady) {
      await this.producer.sendAsync(payloads);
    }
    else {
      throw new ProducerNotReadyError('Notifier Producer not ready');
    }
  }
}

如您所见,它们都不来自分区1和2。即使在持续不断地使用随机整数键发送消息几分钟后,也是如此。我可能做错了什么?

apache-kafka kafka-consumer-api kafka-producer-api
2个回答
2
投票
// Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0 new Producer(client, {paritionerType: 3});

请参阅文档:https://www.npmjs.com/package/kafka-node#producerkafkaclient-options-custompartitioner


1
投票
在此示例中,我基于用户id向用户发送通知消息。这是对邮件进行分区的关键。

const { KafkaClient, HighLevelProducer, KeyedMessage } = require('kafka-node'); const Promise = require('bluebird'); const NotificationMessage = require(__dirname + '/../models/notificationMessage.js'); const ProducerNotReadyError = require(__dirname + '/../errors/producerNotReadyError.js'); const Joi = require('@hapi/joi'); const KAFKA_TOPIC_TEMPORARY = 'NotifierTemporary'; /** * @classdesc Producer that sends notification messages to Kafka. * @class */ class NotifierProducer { /** * Create NotifierProducer. * @constructor * @param {String} kafkaHost - address of kafka server */ constructor(kafkaHost) { const client = Promise.promisifyAll(new KafkaClient({kafkaHost})); const producerOptions = { partitionerType: HighLevelProducer.PARTITIONER_TYPES.keyed, // this is a keyed partitioner }; this.producer = Promise.promisifyAll(new HighLevelProducer(client, producerOptions)); this.isReady = false; this.producer.on('ready', async () => { await client.refreshMetadataAsync([KAFKA_TOPIC_TEMPORARY]); console.log('Notifier Producer is operational'); this.isReady = true; }); this.producer.on('error', err => { console.error('Notifier Producer error: ', err); this.isReady = false; }); } /** * @description Adds a notification message to the Kafka topic that is not saved in a database. * @param {Int} recipientId - accountId of recipient of notification message * @param {String} subject - subject header of the message * @param {Object} message - message payload to send to recipient */ async sendTemporaryNotification(recipientId, subject, message) { const notificationMessage = { recipient: recipientId, subject, message, }; // we need to validate this message schema - this will throw if invalid Joi.assert(notificationMessage, NotificationMessage); // partition based on the recipient const messageKM = new KeyedMessage(notificationMessage.recipient, JSON.stringify(notificationMessage)); const payloads = [ { topic: KAFKA_TOPIC_TEMPORARY, messages: messageKM, key: notificationMessage.recipient }, ]; if (this.isReady) { await this.producer.sendAsync(payloads); } else { throw new ProducerNotReadyError('Notifier Producer not ready'); } } } /** * Kafka topic that the producer and corresponding consumer will use to send temporary messages. * @type {string} */ NotifierProducer.KAFKA_TOPIC_TEMPORARY = KAFKA_TOPIC_TEMPORARY; module.exports = NotifierProducer;

© www.soinside.com 2019 - 2024. All rights reserved.