Reputation: 592
Is it possible to have kafka source module work as a processor module in spring xd runtime? Any code samples?
I am trying to achieve something like this: http (xd source) | kafka source (xd processor)| kafka consumer (xd sink)
I am trying to do this because I have streaming data coming over http which I want to manage with kafka message bus.
My stream definition is this:
stream create kafkaSourceTest --definition "http --outputType=application/json | kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log " --deploy
Putting out of the box implementation of kafka source module implementation in processor module of spring xd results in an error like this:
2015-05-12 11:18:52,914 1.1.1.RELEASE ERROR pool-13-thread-4 http.NettyHttpInboundChannelAdapter - Error sending message
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'admin:default,admin,singlenode,hsqldbServer:9393.kafkaSourceTest.0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:277) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
Upvotes: 0
Views: 690
Reputation: 4179
I am trying to do this because I have streaming data coming over http which I want to manage with kafka message bus.
If you use kafka as the messagebus (after setting transport), then the stream like "http | log" will have the http messages flow through kafka messagebus. In this case, the topic in Kafka broker would be defined by XD internals.
Is it possible to have kafka source module work as a processor module in spring xd runtime?
No, a source module can not act as a processor module. If you want the messages flow through specific topic in Kafka then you could have a stream that has a kafka sink module that receives data from http source and another stream that configures the kafka source module with the same topic.
and, this can be achieved this way:
stream create KafkaSink --definition "http --outputType=application/json | kafka --brokerList= --topic=kafkaTestTopic" --deploy
stream create KafkaSource --definition "kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log" --deploy
Upvotes: 3