consumer.createMessageStreams(map)方法是顺序读取还是以某种批量读取

问题描述 投票:0回答:1

我对卡夫卡很新。我们正在编写当前应用程序中的使用者,该应用程序从一个主题中消耗并对所消耗的数据进行一些处理。我想明白,当我在下面的代码片段写下时,内部会发生什么。

它按预期工作,消耗数据并进行处理,但只是想知道如何从主题中读取数据。

createMessageStreams方法是从主题中顺序读取数据还是读取特定数量的批处理并处理它们?

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(map);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
apache-kafka kafka-consumer-api
1个回答
1
投票

首先,请注意ConsumerConnectorkafka.consumer.KafkaStream类在kafka v#0.11.0版本中已弃用。如果您使用的是旧版本,则应计划升级到至少v#1.0或更高版本的新版本。

createMessageStreams方法是从主题中顺序读取数据还是读取特定数量的批处理并处理它们?

.createMessageStreams返回主题和KafkaStream对列表的地图。 (topic,list#stream)每个流都支持主题的消息或元数据对上的迭代器。它仅在分区内按顺序读取数据。如果您的分区数多于流线程数,则一个线程可以从多个分区读取。但仅在分区内,序列顺序得到保证。

  for (final KafkaStream<byte[], byte[]> stream : streamList) 
    {
       ConsumerIterator<byte[], byte[]> it= stream.iterator();
       while (it.hasNext()) 
       {
          String message = new String(it.next().message());
          System.out.println(message);
        }
      }
}

v#0.11以后的等效功能是.poll()方法。您可以设置max.poll.recordsmax.poll.interval.ms以分别设置每个轮询请求和间隔持续时间的记录数。

你可以在这里找到新的消费者:https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

© www.soinside.com 2019 - 2024. All rights reserved.