kafka服务器和客户端jar移动到最新的库:0.10.0.1
我的消费者和生产者代码使用上面提到的最新kafka罐子,但仍使用旧的消费者api(0.8.2)。
我在调用commit offset时面临消费者方面的问题。
2017-04-10 00:07:14,547 ERROR kafka.consumer.ZookeeperConsumerConnector [groupid1_x.x.x.x-1491594443834-b0e2bce5], Error while committing offsets. java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120)
at kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsumerConnector.scala:354)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:351)
at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:331)
at kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerConnector.scala:111)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.commitOffset(KafkaHLConsumer.java:173)
at com.zoho.mysqlbackup.kafka.consumer.KafkaHLConsumer.run(KafkaHLConsumer.java:271)
kafka服务器端配置:
listeners=PLAINTEXT://:9092
log.message.format.version=0.8.2.1
broker.id.generation.enable=false
unclean.leader.election.enable=false
以下为kafka消费者的配置:
auto.commit.enable is overridden to false
auto.offset.reset is overridden to smallest
consumer.timeout.ms is overridden to 100
dual.commit.enabled is overridden to true
fetch.message.max.bytes is overridden to 209715200
group.id is overridden to crm_topic1_hadoop_tables
offsets.storage is overridden to kafka
rebalance.backoff.ms is overridden to 6000
zookeeper.session.timeout.ms is overridden to 23000
zookeeper.sync.time.ms is overridden to 2000
创建消费者我在api下面使用:
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
并提交电话
consumer.commitOffsets();
在阅读来自kafka的消息时,我们使用以下方法来处理超时
private boolean hasNext(ConsumerIterator<byte[], byte[]> it)
{
try
{
it.hasNext();
return true;
}
catch (ConsumerTimeoutException e)
{
return false;
}
}
这是必需的,因为我们只想在从kafka收到的特定时间间隔或消息(字节)大小之后开始处理。
相同的异常,即使在设置了double.commit.enabled = false consumer.timeout.ms = 1000之后,其他设置保持为旧配置。
更多细节:
在版本0.8.2.1中,我从未遇到过这样的问题。移动到0.10.0.1(客户端和服务器)后,开始获取此异常。
在处理/推送到hadoop之前,我们正在阅读多条消息。处理/写入hadoop部分需要时间(约5分钟)。在这个过程之后,当我们试图推动时,我们正在超越异常。这个例外我每次第二次commitOffset都会遇到这种情况。有些时候(commitOffset使用前一次提交的10秒调用)第二次提交没有异常。
供您参考。如果提交偏移失败,那么消费者只需读取下一条消息而不返回上一次成功的提交偏移位置。但如果提交偏移失败并重新启动消费者进程,那么它将从旧提交位置读取。
正如我在问题细节中所提到的,我正在使用最新的kafka罐子,但是使用旧的消费者客户端:
kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(propForHLConsumer));
我通过调用第二次commitOffset解决了这个问题。
实际上是与connections.max.idle.ms有关的问题。这个属性是最新的kafka(经纪人= 10分钟,消费者= 9分钟,生产者= 9分钟)。
因此,每当我的旧消费者在10分钟后调用第二次提交偏移时,我就会超过异常。
使用旧的使用者API,无法设置此属性。和代理配置我无法更改(由其他团队处理并为其他用户提供相同的代理)...
在这里,我认为旧的commitOffset调用需要另一个连接(除了迭代器),并且当它的理想超过10分钟时,该连接正在接近。我对此不太确定。
如果第一次commitOffset调用发生任何故障,那么第二次调用将确保获得成功。如果第一个本身获得成功,那么下一个执行将不会产生任何问题。无论如何,我们几乎没有调用commit offset。
接下来,我将使用最新的kafka使用者和生产者Java API移动我的代码。