Kafka配置MAX_BYTES仍然受到500条消息的限制

问题描述 投票:0回答:1

我的用户配置如下:

问题是,当我从测试主题(1个分区包含1000条消息)轮询我的数据时,每次轮询仅获得500条消息。每条消息大约是90个字节。此配置绝对应该足够高以处理所有数据。有什么理由吗?

消费配置

    public static KafkaConsumer<String, SpecificRecordBase> createConsumer(
            Arguments args) {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, args.bootstrapServers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, args.groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "4500");

        // Data batching configuration
        properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");

        // Specify the number of bytes you want to read in batch
        properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, args.schemaRegistryUrl);

        return new KafkaConsumer<>(properties);
    }

投票站

.....
            while (true) {
                ConsumerRecords<String, SpecificRecordBase> records =
                        myConsumer.poll(Duration.ofSeconds(CONSUMER_POLL_SECONDS));
....

这里的记录数是500

编辑:

在文档中阅读,默认轮询计数为500。我需要哪个配置?我并不真正在乎消息数量,我在乎流式传输的字节数。

        properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "500000000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500000000");
java apache-kafka
1个回答
0
投票

还有一个使用者配置属性max.poll.records,您没有更改其默认值500。

如果您使用的是Java使用者,还可以调整max.poll.records以调整每次循环迭代处理的记录数。

参考:Confluent Kafka Consumer Properties

您可以在这里找到信息:

我记得我有一个类似的问题,但就我而言,问题是由字节限制之一引起的。

© www.soinside.com 2019 - 2024. All rights reserved.