Nodejs kafka消费者无限循环

问题描述 投票:1回答:1

我在ubuntu 16.04机器上运行kafka_2.11-2.0.0。创建了一个主题,并从命令行界面向其生成了一些消息。

enter image description here

从命令行开始消费,它消耗得很好。

enter image description here

但是当我开始像下面这样的nodejs消费者时,它会无限迭代。我的客户端代码中是否有任何遗漏?

var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(
    client,
    [
        {topic: 'mytopic', partition: 0}
    ],
    {
        autoCommit: true
    }
);
consumer.on('message', function (message) {
        console.log(message);
});
consumer.on('error', function (err){
        console.log(err);

})
consumer.on('offsetOutOfRange', function (err){
        console.log(err);
        process.exit();
})

这是输出。

{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: 'message2',
  offset: 1,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: 'message3',
  offset: 2,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: 'message2',
  offset: 1,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: 'message3',
  offset: 2,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
{ topic: 'mytopic',
  value: 'message2',
  offset: 1,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: 'message3',
  offset: 2,
  partition: 0,
  highWaterOffset: 3,
  key: null }
{ topic: 'mytopic',
  value: '',
  offset: 0,
  partition: 0,
  highWaterOffset: 3,
  key: '' }
node.js apache-kafka kafka-consumer-api mq
1个回答
0
投票

终于发现了kafka新版本2.0.0的问题。所以我转移到以前的版本,它现在正在工作。

© www.soinside.com 2019 - 2024. All rights reserved.