Reputation: 597
I am using KafkaJS library for consuming messages from a kafka topic. My requirement is to get the messages into an array which I can use in my application later. I can log the messages in the console but not able to populate the array
const { Kafka, logLevel } = require("kafkajs")
const path = require("path");
const fs = require("fs");
const clientId = "my-client";
const brokers = ["localhost:443"]
const topic = "client.attributes";
const kafka = new Kafka({
clientId,
brokers,
logLevel: logLevel.INFO,
connectionTimeout: 5000,
retry: {
initialRetryTime: 3000,
retries: 2,
},
});
const consumer = kafka.consumer({
groupId: clientId,
minBytes: 5,
maxBytes: 1e6,
maxWaitTimeInMs: 3000,
})
const consume = async () => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });
const arr = [];
let sample = await consumer.run({
// this function is called every time the consumer gets a new message
eachMessage: ({ message }) => {
// here, we just log the message to the standard output
let clientData = JSON.stringify(message.value);
arr.push(clientData);
},
});
console.log(arr)
};
module.exports = {consume}
In the index.js
file, I have the following code
const consume = require("./consume")
consume().catch((err) => {
console.error("error in consumer", err);
});
How to get the messages into the array? Currently when I run node index.js
, I am getting an empty array. What am I missing?
Upvotes: 1
Views: 361
Reputation: 2535
instead of using eachMessage
you can use eachBatch
:
await consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
uncommittedOffsets,
isRunning,
isStale,
pause,
}) => {
for (let message of batch.messages) {
console.log({
topic: batch.topic,
partition: batch.partition,
highWatermark: batch.highWatermark,
message: {
offset: message.offset,
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
}
})
resolveOffset(message.offset)
await heartbeat()
}
},
})
if you need read more about eachBatch
take a look at eachBatch documentation
Upvotes: 0