kafka逐渐增加生产者和消费者的内存使用问题

问题描述 投票:0回答:0

我面临 Kafka 生产者和消费者的内存问题,我们的应用程序部署在 OpenShift 上。生产者和消费者应用程序的内存不断增加。 加载4320000/天,调整一些配置后,它以缓慢的速度增长,但问题仍然存在。

Kafka生产者代码及设置 主要逻辑

for (int i = 0; i < alarmList.size(); i++) {
    int extractUniqueHashCode = eventMsgId.hashCode();
    avromsg avromsg = eventToEventAvro(alarmList.get(i));
    ProducerRecord<String, avromsg> record =
                    new ProducerRecord<String, avromsg>(topicName, String.valueOf(extractUniqueHashCode), avromsg);
    try {
        RecordMetadata recordMetadata = producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e == null) {
                    String message = String.format("sent message to topic:%s partition:%s  offset:%s",
                                    recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                    LOG.debug("message:: {}", message);
                } else {
                    // exception handling
                    LOG.error("Exception:: {}" , e);
                }
            }
        }).get();
    } catch (InterruptedException e) {
        LOG.error("Exception:: {}", e);
        throw e;
    } catch (ExecutionException e) {
        LOG.error("Exception:: {}", e);
        throw e;
    }
}

生产者配置

Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, producerApplicationID);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerEndpoint());
LOG.debug("BOOTSTRAP_SERVERS_CONFIG value :: {}", BrokerEndpoint());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SchemaRegistryUrl());
LOG.debug("SCHEMA_REGISTRY_URL_CONFIG value :: {}", SchemaRegistryUrl());
// create safe Producer  props.setProperty(ProducerConfig.LINGER_MS_CONFIG,Integer.toString(5);
props.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, Boolean.toString(EnableIdempotenceConfig()));
props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
props.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
props.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, Integer.toString("5"); // kafka 2.0 >= 1.1 so we can keep this as 5. Use 1 otherwise.
props.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.toString(60000);`

Kafka消费者代码及设置

try (KafkaConsumer<String, avromsg> kafkaConsumer = getKafkaConsumer()) {
    kafkaConsumer.subscribe(Arrays.asList(applicationProperties.getTopicName()));
    while (true) {
        ConsumerRecords<String, avromsg> records = getKafkaConsumer().poll(1000);
        int recNum = records.count();
        LOG.debug("Number of records retrieved from kafka: {}", recNum);
        if (recNum > 0 && LOG.isDebugEnabled()) {
            records.partitions().forEach(partition -> {
                List<ConsumerRecord<String, avromsg>> partitionRecords = records.records(partition);
                if (!partitionRecords.isEmpty()) {
                    LOG.debug("Records distribution: partition: {}, records: {}, last offset {}", partition.partition(), partitionRecords.size(), partitionRecords.get(partitionRecords.size() - 1).offset());
                }
            });
        }
        //records.forEach(record -> LOG.info("record value", record.value()));
        for (ConsumerRecord<String, avromsg> record : records) {
            LOG.debug("Received new record: partition: {}, offset: {}, key: {}  value: {}", record.partition(), record.offset(), record.key(), record.value());
            IEventMsg eventmsg = new KafkaEventMsg(record.value());
            IEventMsgHandler.handle(eventmsg); // here we basically perform insertion or update in maria db
            recNum--;
        }
    }
}

消费者配置

Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, ConsumerApplicationID());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BrokerEndpoint());
LOG.debug("BOOTSTRAP_SERVERS_CONFIG - value :: {}", props.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SchemaRegistryUrl());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GroupID());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG,false);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);  //ensures records are properly converted
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,86400000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,360);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,524288;
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,5242880);

我们尝试调整生产者和消费者的配置,但没有解决问题。我打算现在附加探查器来检查实际原因,但欢迎提出任何建议。 Consumer 的配置有点稳定,内存使用量在 400MB 到 500MB 之间,但对于 producer 来说,它正在缓慢增长。

版本详情:

<kafka.client.version>3.3.1</kafka.client.version>
<confluent.version>5.3.0</confluent.version>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>

我们现在有一个主题,有 100 个分区和 3 个代理,一个生产者和一个消费者。

java apache-kafka kafka-consumer-api avro confluent-schema-registry
© www.soinside.com 2019 - 2024. All rights reserved.