Adrian
Adrian

Reputation: 494

kafka-node does not receive messages in real time

I followed the quick start guide: http://kafka.apache.org/documentation.html#quickstart and would like to write a consumer in node.js. The topic 'test' was successfully created, I can use kafka-console-produce.sh and receive messages by kafka-console-consumer.sh

I wrote a simple consumer (live.js):

var kafka = require('kafka-node'),
    client = new kafka.Client('localhost:2181/'),
    consumer = new kafka.Consumer(client,
                                  [{'topic': 'test', partition: 0}],
                                  {autoCommit: true});

client.on('ready', function(){
  console.log('Client ready!');
});

console.log(client);
console.log(consumer);

consumer.on('error', function (err) {
  console.log("Kafka Error: Consumer - " + err);
});

consumer.on('offsetOutOfRange', function (err){
  console.log("Kafka offsetOutOfRange: " + err);
});

consumer.on('message', function(message){
  console.log(message);
});

While running node live.js I receive all previously sent messages. But when live.js is running and I produce a message by Kafka-supplied script, the message is not received by live.js (but it is by the consumer script shipped with Kafka). After restarting live.js, I receive the messages, but I would like to get in 'real time'. I use default configuration, here are the logs from live.js:

EventEmitter {
  connectionString: 'localhost:2181/',
  clientId: 'kafka-node-client',
  zkOptions: undefined,
  noAckBatchOptions: undefined,
  brokers: {},
  longpollingBrokers: {},
  topicMetadata: {},
  topicPartitions: {},
  correlationId: 0,
  _socketId: 0,
  cbqueue: {},
  brokerMetadata: {},
  ready: false,
  zk: 
   EventEmitter {
     client: 
      Client {
        domain: null,
        _events: [Object],
        _eventsCount: 3,
        _maxListeners: undefined,
        connectionManager: [Object],
        options: [Object],
        state: [Object] },
     _events: 
      { init: [Object],
        brokersChanged: [Function],
        disconnected: [Object],
        error: [Function] },
     _eventsCount: 4 },
  _events: 
   { ready: [ [Function], [Function] ],
     error: [Function],
     close: [Function],
     brokersChanged: [Function] },
  _eventsCount: 4 }
EventEmitter {
  fetchCount: 0,
  client: 
   EventEmitter {
     connectionString: 'localhost:2181/',
     clientId: 'kafka-node-client',
     zkOptions: undefined,
     noAckBatchOptions: undefined,
     brokers: {},
     longpollingBrokers: {},
     topicMetadata: {},
     topicPartitions: {},
     correlationId: 0,
     _socketId: 0,
     cbqueue: {},
     brokerMetadata: {},
     ready: false,
     zk: EventEmitter { client: [Object], _events: [Object], _eventsCount: 4 },
     _events: 
      { ready: [Object],
        error: [Function],
        close: [Function],
        brokersChanged: [Function] },
     _eventsCount: 4 },
  options: 
   { autoCommit: true,
     groupId: 'kafka-node-group',
     autoCommitMsgCount: 100,
     autoCommitIntervalMs: 5000,
     fetchMaxWaitMs: 100,
     fetchMinBytes: 1,
     fetchMaxBytes: 1048576,
     fromOffset: false,
     encoding: 'utf8' },
  ready: false,
  paused: undefined,
  id: 0,
  payloads: 
   [ { topic: 'test',
       partition: 0,
       offset: 0,
       maxBytes: 1048576,
       metadata: 'm' } ],
  _events: { done: [Function] },
  _eventsCount: 1,
  encoding: 'utf8' }
Client ready!

--EDIT--

After stopping live.js and starting it again Zookeeper log shows the following:

[2016-01-27 15:53:20,135] INFO Accepted socket connection from /127.0.0.1:38166 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-01-27 15:53:20,139] WARN Connection request from old client /127.0.0.1:38166; will be dropped if server is in r-o mode (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-27 15:53:20,140] INFO Client attempting to establish new session at /127.0.0.1:38166 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-27 15:53:20,166] INFO Established session 0x1528384e45e0007 with negotiated timeout 30000 for client /127.0.0.1:38166 (org.apache.zookeeper.server.ZooKeeperServer)

Upvotes: 0

Views: 4237

Answers (2)

WT.D
WT.D

Reputation: 43

Same issue here. Downgrading to version 0.2.27 solves it.

Upvotes: 0

user2178244
user2178244

Reputation: 38

Try to use HighLevel consumer works good until you have some specific needs. I am using the following options for highlevel consumer

            {
                groupId: "Consumer group",
            // Auto commit config
                autoCommit: true,
                autoCommitMsgCount: 100,
                autoCommitIntervalMs: 5000,
            // Fetch message config
                fetchMaxWaitMs: 100,
                fetchMinBytes: 1,
                fetchMaxBytes: 1024 * 10,
                fromOffset: true,
                fromBeginning: false, //to stop reading from beggening
                encoding:'utf8'
            }

Upvotes: 0

Related Questions