我的用户配置如下:
问题是,当我从测试主题(1个分区包含1000条消息)轮询我的数据时,每次轮询仅获得500条消息。每条消息大约是90个字节。此配置绝对应该足够高以处理所有数据。有什么理由吗?
public static KafkaConsumer<String, SpecificRecordBase> createConsumer(
Arguments args) {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args.bootstrapServers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class.getName());
properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, args.groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "4500");
// Data batching configuration
properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");
// Specify the number of bytes you want to read in batch
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, args.schemaRegistryUrl);
return new KafkaConsumer<>(properties);
}
.....
while (true) {
ConsumerRecords<String, SpecificRecordBase> records =
myConsumer.poll(Duration.ofSeconds(CONSUMER_POLL_SECONDS));
....
这里的记录数是500
编辑:
在文档中阅读,默认轮询计数为500。我需要哪个配置?我并不真正在乎消息数量,我在乎流式传输的字节数。
properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500000000");
还有一个使用者配置属性max.poll.records
,您没有更改其默认值500。
如果您使用的是Java使用者,还可以调整max.poll.records以调整每次循环迭代处理的记录数。
参考:Confluent Kafka Consumer Properties
您可以在这里找到信息:
我记得我有一个类似的问题,但就我而言,问题是由字节限制之一引起的。