Reputation: 265
I try to use consumer and producer by kafka-node npm library, In order to use kafka as a messaging service via nodejs.
The problem is that even though my producer sometimes works, The consumer keeps giving me time out error, or it just freeses in an endless loop try to connect to kafka, even tought kafka is working fine.
I use kafka-node on my windows machine, when Kafka is on a remote centos7 machine. The weird behavior continues even if I put all the code (consumer and producer) in the same machine as kafka (thinking mabye windows is part of the problem).
I tried sending messages in the kafka built in producer console, but my consumer still doesn't appear to sucscribe to the topic and getting the messages.
This is my simple producer code:
const kclient = new kafka.KafkaClient({kafkaHost:'ADDR:9092'});
kclient.on('error',(err) => {
console.log(err)
})
kprod = new producer (kclient);
// kconsumer = new consumer(kclient);
kprod.on('error',(err) => {
console.log(`error: ${err}`);
})
kprod.on('ready',() => {
console.log(`connected to kafka`);
let tranNumSentToKafka = 0
for (let index = 0; index < transArray.length; index++) {
const element = JSON.stringify(transArray[index])
console.log(`sending data to kafka`);
kprod.send([{
topic:'test',
messages:element
}],
(err,data) => {
if(err){console.error(err)}
else{
tranNumSentToKafka += 1
console.log(`data sent: ${JSON.stringify(data)}`);
console.log(`sent ${tranNumSentToKafka} transactions to kafka`);
}
})
}
})
When I run this, it sometimes gives timeout error, like so:
{ TimeoutError: Request timed out after 30000ms
at new TimeoutError (C:\Users\Yishai Nachaliel\Documents\try\kafka-node-elastic\node_modules\kafka-node\lib\errors\TimeoutError.js:6:9)
at Timeout.timeoutId._createTimeout [as _onTimeout] (C:\Users\Yishai Nachaliel\Documents\try\kafka-node-elastic\node_modules\kafka-node\lib\kafkaClient.js:1007:14)
at ontimeout (timers.js:436:11)
at tryOnTimeout (timers.js:300:5)
at listOnTimeout (timers.js:263:5)
at Timer.processTimers (timers.js:223:10) message: 'Request timed out
after 30000ms' }
but sometimes it works and it gives me the following output:
...
sent 96 transactions to kafka
data sent: {"test":{"0":15740}}
sent 97 transactions to kafka
data sent: {"test":{"0":15741}}
sent 98 transactions to kafka
data sent: {"test":{"0":15742}}
sent 99 transactions to kafka
data sent: {"test":{"0":15743}}
sent 100 transactions to kafka
This is my simple consumer:
kafka = require('kafka-node'),
producer = kafka.Producer,
consumer = kafka.Consumer;
const kclient = new kafka.KafkaClient({
kafkaHost:'10.0.0.55:9092'
// kafkaHost:'35.186.191.135:9092'
});
kclient.on('ready',() => {
console.log(`kclient ready`);
kconsumer = new consumer(kclient,[{
topic:'test',
partition:0
}]);
kconsumer.on('error',(err) => {
console.error(` in kconsumer: \n${err}\n`)
})
kconsumer.on('ready',() => {
console.log(`kconsumer ready`);
kconsumer.on('message',(msg) => {
console.log(`recived msg: ${msg}`);
})
})
})
kclient.on('error',(err) => {
console.error(`err in kclient: \n${err}\n`)
})
When I run this on my windows machine, I get:
kclient ready
in kconsumer:
TimeoutError: Request timed out after 30000ms
When I run the consumer on the centos machine, I don't get any errors, just a freeze:
kclient ready
and never "kconsumer ready".
Running both in DEBUG mode just show that:
...
kafka-node:KafkaClient kafka-node-client reconnecting to ADDR:9092 +1s
kafka-node:KafkaClient kafka-node-client createBroker ADDR:9092 +2ms
kafka-node:Consumer connection closed +1s
kafka-node:KafkaClient kafka-node-client socket closed ADDR:9092 (hadError: true) +3ms
...
When I test if kafka and zookeeper work on my centos machine everything works fine, I tested it using the producer/consumer consoles.
Plus, I checked zookeeper and kafka logs, there are no errors, but no indication that any producer or consumers have connected, or send any messages ether.
Has anyone encounterd this issue with kafka-node library?
Has any one how did found a solution?
Upvotes: 3
Views: 5903
Reputation: 990
I'm not sure if it hits exact situation, but I had similar problematic behaviour.
To work with Kafka, you client should have access both to broker (that one with ADDR:9092
) AND kafka data node.
When client connects to broker, it says "I need topic XXX". Broker answers "Topic XXX is on node ADDR-YYY" And client tries to connect ADDR-YYY
Possible problems is broker returns address for kafka node, which is not accessible from you consumer machine. It can be either blocked (e.g. you didn't expose specific port), or it can be unreachable from name (e.g. broken returns local-specific address, like kafka-node-1.localhost
, when should be some raw IP or something).
Upvotes: 1