Neeraj
Neeraj

Reputation: 592

how to get kafka to consume http streaming data on spring xd runtime?

Is it possible to have kafka source module work as a processor module in spring xd runtime? Any code samples?

I am trying to achieve something like this: http (xd source) | kafka source (xd processor)| kafka consumer (xd sink)

I am trying to do this because I have streaming data coming over http which I want to manage with kafka message bus.

My stream definition is this:

stream create kafkaSourceTest --definition "http --outputType=application/json | kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log " --deploy

Putting out of the box implementation of kafka source module implementation in processor module of spring xd results in an error like this:

2015-05-12 11:18:52,914 1.1.1.RELEASE ERROR pool-13-thread-4 http.NettyHttpInboundChannelAdapter - Error sending message

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'admin:default,admin,singlenode,hsqldbServer:9393.kafkaSourceTest.0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)

Upvotes: 0

Views: 690

Answers (1)

Ilayaperumal Gopinathan
Ilayaperumal Gopinathan

Reputation: 4179

I am trying to do this because I have streaming data coming over http which I want to manage with kafka message bus.

If you use kafka as the messagebus (after setting transport), then the stream like "http | log" will have the http messages flow through kafka messagebus. In this case, the topic in Kafka broker would be defined by XD internals.

Is it possible to have kafka source module work as a processor module in spring xd runtime?

No, a source module can not act as a processor module. If you want the messages flow through specific topic in Kafka then you could have a stream that has a kafka sink module that receives data from http source and another stream that configures the kafka source module with the same topic.

and, this can be achieved this way:

stream create KafkaSink --definition "http --outputType=application/json | kafka --brokerList= --topic=kafkaTestTopic" --deploy

stream create KafkaSource --definition "kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log" --deploy

Upvotes: 3

Related Questions