使用 Librdkafka 向 kafka 服务器发送消息时出现内存泄漏

问题描述 投票:0回答:1

我正在设置一个 C 应用程序,该应用程序应该将传感器数据发送到 kafka 服务器。该消息仅包含一个 JSON 字符串,其中包含所有传感器名称及其值。

kafka生产者是这样设置的:

int setupKafkaProducer(struct KafkaParameters *kafkaParameters, struct ClientOPCEndpointInfo* *clientInfos, int clientInfosLength, bool runTest)
{
    logInfo("START - Setting up kafka producer", true);

    conf = rd_kafka_conf_new();

    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

    rd_kafka_conf_res_t res = RD_KAFKA_CONF_OK;

    // setting up parameters ...

    if (res != RD_KAFKA_CONF_OK)
    {
        g_error("Failed to setup kafka config: %s", errstr);
        logError("Failed to setup kafka config", true);

        return 1;
    }    

    producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));

    if (!producer)
    {
        g_error("Failed to create new producer: %s", errstr);
        logError("Failed to create new producer!", true);

        return 1;
    }

    conf = NULL;

    return 0;
}

消息回调仅用于报告发送kafka消息时可能出现的错误。

消息的发送方式如下:

int sendKafkaMessage(char *kafkaMessage)
{
    int message_count = 1;
    const char *topic = kafkaTopic;
    const char *value = kafkaMessage;

    for (int i = 0; i < message_count; i++)
    {
        size_t value_len = strlen(value);

        rd_kafka_resp_err_t err;

        err = rd_kafka_producev(producer,
                                RD_KAFKA_V_TOPIC(topic),
                                RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                                RD_KAFKA_V_KEY(NULL, 0),
                                RD_KAFKA_V_VALUE((void*)value, value_len),
                                RD_KAFKA_V_OPAQUE(NULL),
                                RD_KAFKA_V_END);

        if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
        {
            // g_warning("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));
            // logError("Failed to produce topic!", true);

            return 1;
        }
        else
        {
            // g_message("Produced event to topic %s: value = %12s", topic, value);
        }

        rd_kafka_poll(producer, 0);
    }

    // g_message("Flushing final messages..");
    rd_kafka_flush(producer, 100);

    if (rd_kafka_outq_len(producer) > 0)
    {
        // g_warning("%d message(s) were not delivered", rd_kafka_outq_len(producer));
        // logError("Kafka message(s) were not delivered!", true);

        return 1;
    }

    // g_message("%d events were produced to topic %s.", message_count, topic);

    return 0;
}

我怀疑内存泄漏,因为程序在一段时间后被 Ubuntu 杀死了。 Valgrind 报告如下:

==19032== 92,178 bytes in 9 blocks are definitely lost in loss record 45 of 45
==19032==    at 0x4848899: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==19032==    by 0x4A37F15: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032==    by 0x49FC06A: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032==    by 0x49E48E3: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032==    by 0x49F0B59: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032==    by 0x49F0F79: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032==    by 0x49B0D67: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032==    by 0x4DC7934: start_thread (pthread_create.c:439)
==19032==    by 0x4E58BF3: clone (clone.S:100)

这似乎是 librdkafka 库的内部问题。但我无法判断这是我的错误使用造成的还是库本身的错误造成的。

c memory-leaks valgrind librdkafka
1个回答
0
投票

为了使泄漏报告有意义,您的应用程序需要彻底终止。

这意味着没有被杀死,只是干净地退出

main()

© www.soinside.com 2019 - 2024. All rights reserved.