Hith
Hith

Reputation: 448

push messages from kafka consumer to mongodb

I have created kafka consumer using 'kafka-node', on the event

consumer.on('message' ()=>{
connecting to mongodb and inserting to a collection.
})

mongo.js file used to create connection to mongo and return the object

const MongoClient = require('mongodb').MongoClient, assert = require('assert');

const url = 'mongodb://root:****@ds031257.mlab.com:31257/kafka-node';

let _db;

 const connectDB =  (callback) => {
     try {
         MongoClient.connect(url, { useNewUrlParser: true }, (err, database) => {
             console.log('message' + database)
             _db = database.db('kafka-node');
             return callback(err);
         })
     } catch (e) {
         throw e;
     }
 }

 const getDB = () => _db;

 const close = () => _db.close();
 module.exports = { connectDB, getDB, close }

consumer.js is to create consumer and push the messages to mongodb

let kafka = require('kafka-node');
let MongoDB = require('./mongo');
let Consumer = kafka.Consumer,
    // The client specifies the ip of the Kafka producer and uses
    // the zookeeper port 2181
    client = new kafka.KafkaClient({ kafkaHost: 'localhost:9093, localhost:9094, localhost:9095' });
// The consumer object specifies the client and topic(s) it subscribes to
consumer = new Consumer(
    client, [{ topic: 'infraTopic', partitions: 3 }], { autoCommit: false });


consumer.on('ready', function () {
    console.log('consumer is ready');
});

consumer.on('error', function (err) {
    console.log('consumer is in error state');
    console.log(err);
})
client.refreshMetadata(['infraTopic'], (err) => {
    if (err) {
        console.warn('Error refreshing kafka metadata', err);
    }
});
consumer.on('message', function (message) {
    // grab the main content from the Kafka message
    console.log(message);
    MongoDB.connectDB((err) => {
        if (err) throw err
        // Load db & collections
        const db = MongoDB.getDB();
        const collectionKafka = db.collection('sampleCollection');
        try {
            collectionKafka.insertOne(
                {
                    timestamp: message.value,
                    topic: message.topic
                },
                function (err, res) {
                    if (err) {
                        database.close();
                        return console.log(err);
                    }
                    // Success
                }
            )
        } catch (e) {
            throw e
        }
    })
});

is this the right way to push messages to mongodb from a kafka consumer? with this setup it is working till all the messages are written and once it reaches EOL it is throwing "Cannot read property 'db' of null"

Upvotes: 1

Views: 2679

Answers (1)

Robin Moffatt
Robin Moffatt

Reputation: 32100

is this the right way to push messages to mongodb from a kafka consumer?

I guess it's one way, but I'd not call it the right way :)

Much better is to use Kafka Connect. It's part of Apache Kafka, and it is designed to do exactly what you're trying to do - stream data from Kafka to a target system (you can also use it for streaming data from other systems into Kafka).

There's an excellent connector for MongoDB with comprehensive documentation which will do exactly what you're trying to do.

If you need to process the data before writing it then the pattern to follow is process with Kafka Streams, KSQL, or whichever processing tool you want to use—but write it back to a Kafka topic. That topic is then read by Kafka Connect and streamed to your target. That way you decouple the responsibilities, and make for a much more simple yet resilient and scalable system.

Upvotes: 2

Related Questions