BIndu_Madhav
BIndu_Madhav

Reputation: 597

Populating kafka messages in an array in Javascript

I am using KafkaJS library for consuming messages from a kafka topic. My requirement is to get the messages into an array which I can use in my application later. I can log the messages in the console but not able to populate the array

const { Kafka, logLevel } = require("kafkajs")
const path = require("path");
const fs = require("fs");

const clientId = "my-client";
const brokers = ["localhost:443"]
const topic = "client.attributes";

const kafka = new Kafka({
  clientId,
  brokers,
  logLevel: logLevel.INFO,
  connectionTimeout: 5000,
  retry: {
    initialRetryTime: 3000,
    retries: 2,
  },
});

const consumer = kafka.consumer({
    groupId: clientId,
    minBytes: 5,
    maxBytes: 1e6,
    maxWaitTimeInMs: 3000,
})

const consume = async () => {

  await consumer.connect();
  await consumer.subscribe({ topic, fromBeginning: true });

  const arr = [];
  let sample = await consumer.run({
    // this function is called every time the consumer gets a new message
    eachMessage: ({ message }) => {
      // here, we just log the message to the standard output
      let clientData = JSON.stringify(message.value);
      arr.push(clientData);
      
    },
  });
  console.log(arr)
};

module.exports = {consume}

In the index.js file, I have the following code

const consume = require("./consume")

consume().catch((err) => {
  console.error("error in consumer", err);
});

How to get the messages into the array? Currently when I run node index.js, I am getting an empty array. What am I missing?

Upvotes: 1

Views: 361

Answers (1)

Hasan Haghniya
Hasan Haghniya

Reputation: 2535

instead of using eachMessage you can use eachBatch:

await consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async ({
        batch,
        resolveOffset,
        heartbeat,
        commitOffsetsIfNecessary,
        uncommittedOffsets,
        isRunning,
        isStale,
        pause,
    }) => {
        for (let message of batch.messages) {
            console.log({
                topic: batch.topic,
                partition: batch.partition,
                highWatermark: batch.highWatermark,
                message: {
                    offset: message.offset,
                    key: message.key.toString(),
                    value: message.value.toString(),
                    headers: message.headers,
                }
            })

            resolveOffset(message.offset)
            await heartbeat()
        }
    },
})

if you need read more about eachBatch take a look at eachBatch documentation

Upvotes: 0

Related Questions