Reputation: 95
I am setting up a C application that is supposed to send sensor data to a kafka server. The message consists of only a JSON string containing all the sensor names and their values.
The kafka producer is set up like this:
int setupKafkaProducer(struct KafkaParameters *kafkaParameters, struct ClientOPCEndpointInfo* *clientInfos, int clientInfosLength, bool runTest)
{
logInfo("START - Setting up kafka producer", true);
conf = rd_kafka_conf_new();
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
rd_kafka_conf_res_t res = RD_KAFKA_CONF_OK;
// setting up parameters ...
if (res != RD_KAFKA_CONF_OK)
{
g_error("Failed to setup kafka config: %s", errstr);
logError("Failed to setup kafka config", true);
return 1;
}
producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer)
{
g_error("Failed to create new producer: %s", errstr);
logError("Failed to create new producer!", true);
return 1;
}
conf = NULL;
return 0;
}
The message callback is only used to report possible errors when sending kafka messages.
The messages are sent like this:
int sendKafkaMessage(char *kafkaMessage)
{
int message_count = 1;
const char *topic = kafkaTopic;
const char *value = kafkaMessage;
for (int i = 0; i < message_count; i++)
{
size_t value_len = strlen(value);
rd_kafka_resp_err_t err;
err = rd_kafka_producev(producer,
RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_KEY(NULL, 0),
RD_KAFKA_V_VALUE((void*)value, value_len),
RD_KAFKA_V_OPAQUE(NULL),
RD_KAFKA_V_END);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
{
// g_warning("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));
// logError("Failed to produce topic!", true);
return 1;
}
else
{
// g_message("Produced event to topic %s: value = %12s", topic, value);
}
rd_kafka_poll(producer, 0);
}
// g_message("Flushing final messages..");
rd_kafka_flush(producer, 100);
if (rd_kafka_outq_len(producer) > 0)
{
// g_warning("%d message(s) were not delivered", rd_kafka_outq_len(producer));
// logError("Kafka message(s) were not delivered!", true);
return 1;
}
// g_message("%d events were produced to topic %s.", message_count, topic);
return 0;
}
I suspected a memory leak since the program gets killed by Ubuntu after a certain while. Valgrind reports the following:
==19032== 92,178 bytes in 9 blocks are definitely lost in loss record 45 of 45
==19032== at 0x4848899: malloc (in /usr/libexec/valgrind/vgpreload_memcheck-amd64-linux.so)
==19032== by 0x4A37F15: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49FC06A: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49E48E3: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49F0B59: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49F0F79: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x49B0D67: ??? (in /home/.../build/libs/librdkafka.so.1)
==19032== by 0x4DC7934: start_thread (pthread_create.c:439)
==19032== by 0x4E58BF3: clone (clone.S:100)
This seems to be an internal problem in the librdkafka library. But I can't tell if it is caused by incorrecty use on my end or by a bug in the library itself.
Upvotes: 1
Views: 306
Reputation: 6916
For the leak reports to be meaningful your application needs to terminate cleanly.
That means not being killed, just a clean exit from main()
.
Upvotes: 0