[使用Node.js + Kafka为消费者组创建分布式消费者

问题描述 投票:0回答:1

我正在使用Node.js 10 +Apache Kafka 2.3no-kafka npm软件包。

[目前,我创建了一个主题,其复制因子为3,分区为3。我在3个不同的端口上具有3个kafka代理。

使用no-kafka,我能够看到每个分区数创建了3个使用者,并且所有使用者都驻留在同一台计算机上。下面是代码和运行模型的快照。

CODE:

var Kafka = require('no-kafka');
var Promise = require('bluebird');
var consumer = new Kafka.GroupConsumer({
    connectionString: 'kafka://192.168.1.172:9092, kafka://192.168.1.172:9093, kafka://192.168.1.172:9094'
  });

var dataHandler = function (messageSet, topic, partition) {
    return Promise.each(messageSet, function (m){
        console.log("Topic: " + topic, ", Partition: " + partition, ", Offset: " + m.offset, 
            ", Message: " + m.message.value.toString('utf8'));
        return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
    });
};

var strategies = [{
    subscriptions: ['test'],
    handler: dataHandler
}];

consumer.init(strategies);

当我创建一个生产者并运行它时,我在控制台上得到以下输出。

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-console-producer.sh --broker-list 192.168.1.172:9092 --topic test
>hey
>there
>how are you
>I am
>fine
>and
>how
>about
>you

下面是消费者的输出。

PS D:\checkout\javascript\sample projects\kafka> node .\consumer.js
2019-12-23T15:43:07.822Z INFO no-kafka-client Joined group no-kafka-group-v0.9 generationId 45 as no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7
2019-12-23T15:43:07.822Z INFO no-kafka-client Elected as group leader
2019-12-23T15:43:07.839Z DEBUG no-kafka-client Subscribed to test:0 offset 57 leader 192.168.1.172:9094
2019-12-23T15:43:07.840Z DEBUG no-kafka-client Subscribed to test:1 offset 56 leader 192.168.1.172:9094
2019-12-23T15:43:07.841Z DEBUG no-kafka-client Subscribed to test:2 offset 58 leader 192.168.1.172:9094
Topic: test , Partition: 2 , Offset: 58 , Message: hey
Topic: test , Partition: 1 , Offset: 56 , Message: there
Topic: test , Partition: 0 , Offset: 57 , Message: how are you
Topic: test , Partition: 1 , Offset: 57 , Message: fine
Topic: test , Partition: 2 , Offset: 59 , Message: I am
Topic: test , Partition: 0 , Offset: 58 , Message: and
Topic: test , Partition: 2 , Offset: 60 , Message: how
Topic: test , Partition: 0 , Offset: 59 , Message: you
Topic: test , Partition: 1 , Offset: 58 , Message: about

一切正常,就像是给一个消费者使用的,

<< [1。自动创建1个使用者

2。

消息以循环方式分配到主题分区中

3。

使用者均匀分布在3个分区上,以实现负载平衡,但在同一台计算机上。

当我使用kafka提供的脚本调查消费者组状态时,以下内容将在控制台上输出。

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group no-kafka-group-v0.9 --bootstrap-server 192.168.1.172:9093 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID no-kafka-group-v0.9 test 0 60 60 0 no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48 no-kafka-client no-kafka-group-v0.9 test 1 59 59 0 no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48 no-kafka-client no-kafka-group-v0.9 test 2 61 61 0 no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48 no-kafka-client

问题:

    唯一的问题是使用者在同一台计算机上。我希望将其分布在不同的计算机上,以实现负载平衡并在硬件之间利用适当的资源。
  1. 有没有办法实现这一目标?
  • 注意:

  • 我被限制使用Node.js
    apache-kafka kafka-consumer-api
    1个回答
    0
    投票
    我使用

    kafkajs npm包解决了这个问题。

    NOTE

    :上面有问题的要通过控制台连接生产者。

    CODE:

    const { Kafka } = require('kafkajs') const kafka = new Kafka({ clientId: 'my-app', brokers: ['192.168.1.172:9092', '192.168.1.172:9093', '192.168.1.172:9094'] }) const consumer = kafka.consumer({ groupId: 'test-group' }) const run = async () => { await consumer.connect() await consumer.subscribe({ topic: 'test', fromBeginning: true }) await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }) }, }) } run().catch(console.error)

    单个终端:

    PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js {"level":"ERROR","timestamp":"2019-12-24T03:19:02.613Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"} {"level":"ERROR","timestamp":"2019-12-24T03:19:02.625Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":270} {"level":"INFO","timestamp":"2019-12-24T03:19:02.964Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"} {"level":"INFO","timestamp":"2019-12-24T03:19:03.015Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[0,1,2]},"groupProtocol":"RoundRobinAssigner","duration":48} {"level":"ERROR","timestamp":"2019-12-24T03:19:03.620Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"} { partition: 0, offset: '107', value: 'fgh' } { partition: 2, offset: '109', value: '' } { partition: 1, offset: '108', value: 'asdsa' }

    2个并发终端:

    当我打开另一个终端并在新打开的终端上运行相同的命令时,我得到的控制台输出如下

    PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js {"level":"ERROR","timestamp":"2019-12-24T03:22:21.229Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"} {"level":"ERROR","timestamp":"2019-12-24T03:22:21.236Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":257} {"level":"INFO","timestamp":"2019-12-24T03:22:21.530Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"} {"level":"ERROR","timestamp":"2019-12-24T03:22:22.236Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"} {"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495}

    现在,由于新使用者通过第二终端添加,因此第一使用者在控制台上通过以下日志进行通知。

    {"level":"INFO","timestamp":"2019-12-24T03:22:26.023Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":39}

    3个同时端子:

    虽然我们保持先前的终端处于打开状态,但现在我打开第三个终端,而belos是控制台。

    PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js {"level":"ERROR","timestamp":"2019-12-24T03:28:07.516Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"} {"level":"ERROR","timestamp":"2019-12-24T03:28:07.528Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":273} {"level":"INFO","timestamp":"2019-12-24T03:28:07.865Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"} {"level":"ERROR","timestamp":"2019-12-24T03:28:08.523Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"} {"level":"INFO","timestamp":"2019-12-24T03:28:11.803Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-499da929-d351-4e59-94c9-88a18e97999d","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0]},"groupProtocol":"RoundRobinAssigner","duration":3937}

    第二终端将重新平衡信息汇总如下:

    {"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495} {"level":"ERROR","timestamp":"2019-12-24T03:28:11.720Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"192.168.1.172:9093","clientId":"my-app","error":"The group is rebalancing, so a rejoin is needed","correlationId":144,"size":10} {"level":"ERROR","timestamp":"2019-12-24T03:28:11.725Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":270} {"level":"INFO","timestamp":"2019-12-24T03:28:11.801Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":70}

    第一终端将重新平衡信息汇总如下:

    {"level":"ERROR","timestamp":"2019-12-24T03:28:11.750Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":337} {"level":"INFO","timestamp":"2019-12-24T03:28:11.797Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[2]},"groupProtocol":"RoundRobinAssigner","duration":40}

    所有消费者都已启动并正在运行:

    通过轰炸生产者事件,下面是消费者在各个分区上监听的快照。现在,每个消费者都在聆听一个主题的一个特定分区,该分区非常棒,现在可以在不同的机器上使用以实现并行性。

    enter image description here

    下面是运行命令./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093的消费者到分区映射的状态

    emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test-group test 2 126 126 0 my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48 my-app test-group test 1 124 124 0 my-app-945d6f38-bcda-4f02-b1a2-325957db5846 /192.168.1.48 my-app test-group test 0 124 124 0 my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48 my-app

    关闭1个消费者并重新平衡效果:

    现在,如果我关闭了一个使用者,例如在第三个终端上,那么下面是在一个主题的3个分区的2个使用者之间发生的重新平衡的快照:

    enter image description here

    下面是运行命令./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093的消费者到分区映射的状态

    emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID test-group test 0 123 123 0 my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48 my-app test-group test 2 125 125 0 my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48 my-app test-group test 1 123 123 0 my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48 my-app

    © www.soinside.com 2019 - 2024. All rights reserved.