Stella
Stella

Reputation: 1868

Running node-rdkafka code in server

I'm running the below node-rdkafka code in Eclipse as Node.js application. This is the sample code from https://blizzard.github.io/node-rdkafka/current/tutorial-producer_.html

I want to run this in a test server and call from iOS Mobile application. I knew about running node.js app in AWS. Question I: Is there any other options to run in a free test server environment like Tomcat? Question II: Even If I am able to run this node.js app in a server, how do i call from a mobile application? Do I need to call producer.on('ready', function(arg) (or) What function i need to call from Mobile app?

var Kafka = require('node-rdkafka');
//console.log(Kafka.features);
//console.log(Kafka.librdkafkaVersion);

var producer = new Kafka.Producer({
      'metadata.broker.list': 'localhost:9092',
      'dr_cb': true
    });

var topicName = 'MyTest';

//logging debug messages, if debug is enabled
producer.on('event.log', function(log) {
  console.log(log);
});

//logging all errors
producer.on('event.error', function(err) {
  console.error('Error from producer');
  console.error(err);
});

//counter to stop this sample after maxMessages are sent
var counter = 0;
var maxMessages = 10;

producer.on('delivery-report', function(err, report) {
  console.log('delivery-report: ' + JSON.stringify(report));
  counter++;
});

//Wait for the ready event before producing
producer.on('ready', function(arg) {
  console.log('producer ready.' + JSON.stringify(arg));

  for (var i = 0; i < maxMessages; i++) {
    var value = new Buffer('MyProducerTest - value-' +i);
    var key = "key-"+i;
    // if partition is set to -1, librdkafka will use the default partitioner
    var partition = -1;
    producer.produce(topicName, partition, value, key);
  }

  //need to keep polling for a while to ensure the delivery reports are received
  var pollLoop = setInterval(function() {
      producer.poll();
      if (counter === maxMessages) {
        clearInterval(pollLoop);
        producer.disconnect();
      }
    }, 1000);

});

/*
producer.on('disconnected', function(arg) {
  console.log('producer disconnected. ' + JSON.stringify(arg));
});*/

//starting the producer
producer.connect();

Upvotes: 0

Views: 3240

Answers (2)

Sai Manikanta
Sai Manikanta

Reputation: 31

I might be late on this but this is how I did using promises and found it better than have a time out etc.

const postMessageToPublisher = (req, res) => {

return new Promise((resolve, reject) => {
producer.connect();
producer.setPollInterval(globalConfigs.producerPollingTime);

const actualBody = requestBody.data;

const requestBody = req.body;

const topicName = req.body.topicName;

const key = requestBody.key || uuid();

const partition = requestBody.partition || undefined;

const data = Buffer.from(JSON.stringify(udpatedBody));

/**
 * Actual messages are sent here when the producer is ready
 */
producer.on(kafkaEvents.READY, () => {
  try {
    producer.produce(
      topic,
      partition,
      message,
      key // setting key user provided or UUID
    );
  } catch (error) {
    reject(error);
  }
});

// Register listener for debug information; only invoked if debug option set in driver_options
producer.on(kafkaEvents.LOG, log => {
  logger.info('Producer event log notification for debugging:', log);
});

// Register error listener
producer.on(kafkaEvents.ERROR, err => {
  logger.error('Error from producer:' + JSON.stringify(err));
  reject(err);
});

// Register delivery report listener

producer.on(kafkaEvents.PUBLISH_ACKNOWLEDGMENT, (err, ackMessage) => {
  if (err) {
    logger.error(
      'Delivery report: Failed sending message ' + ackMessage.value
    );
    logger.error('and the error is :', err);
    reject({ value: ackMessage.value, error: err });
  } else {
    resolve({
      teamName: globalConfigs.TeamNameService,
      topicName: ackMessage.topic,
      key: ackMessage.key.toString()
    });
  }
  });
});
};

Please note that kafkaEvents contains my constants for the events we listen to and it is just a reference such as kafkaEvents.LOG is same as event.log

and also the calling function is expecting this to a promise and accordingly we user .then(data => 'send your response to user from here') and .catch(error => 'send error response to user this is how I achieved it using promises

Upvotes: 0

OneCricketeer
OneCricketeer

Reputation: 191758

First of all, you need an HTTP server. ExpressJS can be used. Then, just tack on the Express code basically at the end, but move the producer loop into the request route.

So, start with what you had

var Kafka = require('node-rdkafka');
//console.log(Kafka.features);
//console.log(Kafka.librdkafkaVersion);

var producer = new Kafka.Producer({
  'metadata.broker.list': 'localhost:9092',
  'dr_cb': true
});

var topicName = 'MyTest';

//logging debug messages, if debug is enabled
producer.on('event.log', function(log) {
  console.log(log);
});

//logging all errors
producer.on('event.error', function(err) {
  console.error('Error from producer');
  console.error(err);
});

producer.on('delivery-report', function(err, report) {
  console.log('delivery-report: ' + JSON.stringify(report));
  counter++;
});

//Wait for the ready event before producing
producer.on('ready', function(arg) {
  console.log('producer ready.' + JSON.stringify(arg));
});

producer.on('disconnected', function(arg) {
  console.log('producer disconnected. ' + JSON.stringify(arg));
});

//starting the producer
producer.connect();

Then, you can add this in the same file.

var express = require('express')
var app = express()

app.get('/', (req, res) => res.send('Ready to send messages!'))

app.post('/:maxMessages', function (req, res) {
    if (req.params.maxMessages) {
        var maxMessages = parseInt(req.params.maxMessages);
        for (var i = 0; i < maxMessages; i++) {
            var value = new Buffer('MyProducerTest - value-' +i);
            var key = "key-"+i;
            // if partition is set to -1, librdkafka will use the default partitioner
            var partition = -1;
            producer.produce(topicName, partition, value, key);
       } // end for  
    } // end if 
}); // end app.post()

app.listen(3000, () => console.log('Example app listening on port 3000!'))

I don't think the poll loop is necessary since you don't care about the counter anymore.

Now, connect your mobile app to http://<your server IP>:3000/ and send test messages with a POST request to http://<your server IP>:3000/10, for example, and adjust to change the number of messages to send

Upvotes: 1

Related Questions