Robert Lepen
Robert Lepen

Reputation: 79

Rest API Kafka stream

I am building a simple node.js rest API witch will produce data from Apache Kafka stream. The API should produce JSON, witch I am having problem to achive. Any help how to get JSON array of each message like [{},{},{}]? (Or any better approach how to public Kafka stream and be able to consume Kafka stream for example by Power BI?)

My current output looks like:

{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS  #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}

Output expected like:

[
{"topic":"twitterFeeds","value":"RT @flyingtsunami: @SethAbramson **READ THIS UNTIL IT SINKS IN** #POTUS  #Comey's firing (he was); T…","offset":0,"partition":0,"highWaterOffset":1906,"key":null}
,
{"topic":"twitterFeeds","value":"RT @RCorbettMEP: Why does the @BBC news only …","offset":1,"partition":0,"highWaterOffset":1906,"key":null}
]

My code:

const express = require('express');
const router = express.Router();
    var output1 = '';
var http = require('http');  
var kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    client = new kafka.Client(),
    consumer = new Consumer(
        client,
        [
        { topic: 'twitterFeeds', partition: 0 }
        ],
        {
            autoCommit: false
        }
    );
    var output2 = consumer.on('message', function (message) {
     obj = JSON.stringify(message);
     output1 = output1 + obj;
    });

router.get('/',(req,res)=>{
        res.send(output1);
    });
module.exports = router;

Upvotes: 2

Views: 1566

Answers (1)

Michał Karpacki
Michał Karpacki

Reputation: 2658

I'm currently working on a similar project (if you'd be interested, help would be appreciated) - scramjet-kafka

I have already solved the above problem in scramjet DataStream.toJSONArray itself - it simply streams your data as a JSON Array. In your case it would be quite simply:

new ConsumerStream(
    client,
    [ { topic: 'twitterFeeds', partition: 0 } ],
    { autoCommit: false }
)
    .pipe(new scramjet.DataStream)
    // you may want to do some transforms here
    .toJSONArray()
    .pipe(process.stdout)
    // or pipe it to any other stream, accumulate and so on.

If you like the project and you could help - please get in touch via Github.

Upvotes: 2

Related Questions