Reputation: 141
Would like to consume messages using kafka:message-driven-channel-adapter.
producing the message in below channels: headers['topic'] = inMsge_topic,emailMesge_topic
Didn't find good example for the same in the internet either.Please suggest.
Working fine when using
int-kafka:inbound-channel-adapter but it requires polling.(want to do without polling)
Below is the configuration using:
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext"
auto-startup="true" channel="inputToKafka" message-key="kafka_messageKey">
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor" />
</int-kafka:outbound-channel-adapter>
<int-kafka:producer-context id="kafkaProducerContext"
producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration
broker-list="${kafka.producer.brokerList}" topic="headers['topic']" key-class-type="java.lang.String"
value-class-type="com.vo.MessageVO"
value-encoder="kafkaEncoder" key-encoder="kafkaKeyEncoder"
compression-type="none" />
</int-kafka:producer-configurations>
</int-kafka:producer-context>
<int:channel id="inputToKafka">
<int:queue />
</int:channel>
<int:channel id="inputFromKafka">
</int:channel>
<bean id="kafkaConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
<constructor-arg ref="zookeeperConnect"/>
<bean id="connectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
<constructor-arg ref="kafkaConfiguration"/>
<int-kafka:message-driven-channel-adapter
id="adapter"
channel="inputFromKafka"
connection-factory="connectionFactory"
key-decoder="kafkaKeyDecoder"
payload-decoder="kafkaDecoder"
max-fetch="100"
topics="inMsge_topic"/>
<int-kafka:message-driven-channel-adapter
id="adapter1"
channel="inputFromKafka"
connection-factory="connectionFactory"
key-decoder="kafkaKeyDecoder"
payload-decoder="kafkaDecoder"
max-fetch="100"
topics="emailMesge_topic"/>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="localhost:2181" zk-connection-timeout="6000"
zk-session-timeout="400" zk-sync-time="200" />
log output:
20:01:13.130 [pool-5-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=2]@0 20:01:13.131 [pool-5-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=4]@0 20:01:13.131 [pool-5-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=1]@0 20:01:13.131 [pool-5-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=0]@0 20:01:13.131 [pool-5-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=3]@1913 20:01:13.134 [pool-11-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=1]@0 20:01:13.134 [pool-11-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=3]@1913 20:01:13.134 [pool-11-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=4]@0 20:01:13.134 [pool-11-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=2]@0 20:01:13.134 [pool-11-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='inMsge_topic', id=0]@0 20:01:13.158 [pool-7-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=1]@0 20:01:13.158 [pool-7-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=3]@334 20:01:13.158 [pool-7-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=0]@0 20:01:13.158 [pool-7-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=4]@0 20:01:13.158 [pool-7-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=2]@0 20:01:13.164 [pool-13-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=1]@0 20:01:13.164 [pool-13-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=4]@0 20:01:13.164 [pool-13-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=0]@0 20:01:13.164 [pool-13-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=3]@334 20:01:13.164 [pool-13-thread-1] DEBUG o.s.i.kafka.core.DefaultConnection - Reading from Partition[topic='emailMesge_topic', id=2]@0
Upvotes: 1
Views: 1794
Reputation: 174799
Take a look at the Spring Integration kafka sample - it uses Java configuration instead of XML but it shows both outbound and message-driven adapters.
Upvotes: 1