Reputation: 494
I followed the quick start guide: http://kafka.apache.org/documentation.html#quickstart and would like to write a consumer in node.js. The topic 'test' was successfully created, I can use kafka-console-produce.sh and receive messages by kafka-console-consumer.sh
I wrote a simple consumer (live.js):
var kafka = require('kafka-node'),
client = new kafka.Client('localhost:2181/'),
consumer = new kafka.Consumer(client,
[{'topic': 'test', partition: 0}],
{autoCommit: true});
client.on('ready', function(){
console.log('Client ready!');
});
console.log(client);
console.log(consumer);
consumer.on('error', function (err) {
console.log("Kafka Error: Consumer - " + err);
});
consumer.on('offsetOutOfRange', function (err){
console.log("Kafka offsetOutOfRange: " + err);
});
consumer.on('message', function(message){
console.log(message);
});
While running node live.js
I receive all previously sent messages. But when live.js is running and I produce a message by Kafka-supplied script, the message is not received by live.js (but it is by the consumer script shipped with Kafka). After restarting live.js, I receive the messages, but I would like to get in 'real time'. I use default configuration, here are the logs from live.js:
EventEmitter {
connectionString: 'localhost:2181/',
clientId: 'kafka-node-client',
zkOptions: undefined,
noAckBatchOptions: undefined,
brokers: {},
longpollingBrokers: {},
topicMetadata: {},
topicPartitions: {},
correlationId: 0,
_socketId: 0,
cbqueue: {},
brokerMetadata: {},
ready: false,
zk:
EventEmitter {
client:
Client {
domain: null,
_events: [Object],
_eventsCount: 3,
_maxListeners: undefined,
connectionManager: [Object],
options: [Object],
state: [Object] },
_events:
{ init: [Object],
brokersChanged: [Function],
disconnected: [Object],
error: [Function] },
_eventsCount: 4 },
_events:
{ ready: [ [Function], [Function] ],
error: [Function],
close: [Function],
brokersChanged: [Function] },
_eventsCount: 4 }
EventEmitter {
fetchCount: 0,
client:
EventEmitter {
connectionString: 'localhost:2181/',
clientId: 'kafka-node-client',
zkOptions: undefined,
noAckBatchOptions: undefined,
brokers: {},
longpollingBrokers: {},
topicMetadata: {},
topicPartitions: {},
correlationId: 0,
_socketId: 0,
cbqueue: {},
brokerMetadata: {},
ready: false,
zk: EventEmitter { client: [Object], _events: [Object], _eventsCount: 4 },
_events:
{ ready: [Object],
error: [Function],
close: [Function],
brokersChanged: [Function] },
_eventsCount: 4 },
options:
{ autoCommit: true,
groupId: 'kafka-node-group',
autoCommitMsgCount: 100,
autoCommitIntervalMs: 5000,
fetchMaxWaitMs: 100,
fetchMinBytes: 1,
fetchMaxBytes: 1048576,
fromOffset: false,
encoding: 'utf8' },
ready: false,
paused: undefined,
id: 0,
payloads:
[ { topic: 'test',
partition: 0,
offset: 0,
maxBytes: 1048576,
metadata: 'm' } ],
_events: { done: [Function] },
_eventsCount: 1,
encoding: 'utf8' }
Client ready!
--EDIT--
After stopping live.js and starting it again Zookeeper log shows the following:
[2016-01-27 15:53:20,135] INFO Accepted socket connection from /127.0.0.1:38166 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-01-27 15:53:20,139] WARN Connection request from old client /127.0.0.1:38166; will be dropped if server is in r-o mode (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-27 15:53:20,140] INFO Client attempting to establish new session at /127.0.0.1:38166 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-27 15:53:20,166] INFO Established session 0x1528384e45e0007 with negotiated timeout 30000 for client /127.0.0.1:38166 (org.apache.zookeeper.server.ZooKeeperServer)
Upvotes: 0
Views: 4237
Reputation: 38
Try to use HighLevel consumer works good until you have some specific needs. I am using the following options for highlevel consumer
{
groupId: "Consumer group",
// Auto commit config
autoCommit: true,
autoCommitMsgCount: 100,
autoCommitIntervalMs: 5000,
// Fetch message config
fetchMaxWaitMs: 100,
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 10,
fromOffset: true,
fromBeginning: false, //to stop reading from beggening
encoding:'utf8'
}
Upvotes: 0