Reputation: 79
I am building a simple node.js rest API witch will produce data from Apache Kafka stream.
The API should produce JSON, witch I am having problem to achive.
Any help how to get JSON array of each message like [{},{},{}]
? (Or any better approach how to public Kafka stream and be able to consume Kafka stream for example by Power BI?)
My current output looks like:
{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}
Output expected like:
[
{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
,
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}
]
My code:
const express = require('express');
const router = express.Router();
var output1 = '';
var http = require('http');
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(
client,
[
{ topic: 'twitterFeeds', partition: 0 }
],
{
autoCommit: false
}
);
var output2 = consumer.on('message', function (message) {
obj = JSON.stringify(message);
output1 = output1 + obj;
});
router.get('/',(req,res)=>{
res.send(output1);
});
module.exports = router;
Upvotes: 2
Views: 1566
Reputation: 2658
I'm currently working on a similar project (if you'd be interested, help would be appreciated) - scramjet-kafka
I have already solved the above problem in scramjet DataStream.toJSONArray itself - it simply streams your data as a JSON Array. In your case it would be quite simply:
new ConsumerStream(
client,
[ { topic: 'twitterFeeds', partition: 0 } ],
{ autoCommit: false }
)
.pipe(new scramjet.DataStream)
// you may want to do some transforms here
.toJSONArray()
.pipe(process.stdout)
// or pipe it to any other stream, accumulate and so on.
If you like the project and you could help - please get in touch via Github.
Upvotes: 2