Reputation: 76
I am sending message on topics and can read same message using kafka console and Java consumer code using main method. But message are not getting read from consumer configuration xml. Please help to resolve this issue. I am using message int-kafka:message-driven-channel-adapter. spring-kafka version 1.3.0.RELEASE
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka-1.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputFromKafka">
<int:queue />
</int:channel>
<bean id="zkConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
<constructor-arg ref="zookeeperConnect" />
</bean>
<bean id="kafkaConnectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory" >
<constructor-arg ref="zkConfiguration" />
</bean>
<bean id="decoder"
class="org.springframework.integration.kafka.serializer.common.StringDecoder" />
<int-kafka:message-driven-channel-adapter id="messageProcessor"
channel="inputFromKafka"
connection-factory="kafkaConnectionFactory"
queue-size="${queue.size:1024}"
concurrency="${concurrency:50}"
auto-startup="true"
payload-decoder="decoder"
topics="${topics:nishant}" key-decoder="decoder" />
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="localhost:2181" zk-connection-timeout="6000"
zk-session-timeout="6000" zk-sync-time="2000" />
</beans>
Consumer Java code
@Component(value = "messageProcessor")
public class MessageConsumer {
public void reader(Map<String, Map<Integer, String>> payload) {
System.out.println("TEST");
}
}
Upvotes: 1
Views: 1392
Reputation: 174554
There are several problems with your configuration.
First your "consumer" and channel adapter have the same bean name (id) messageProcessor
. With Spring, the last one wins so one definition will overwrite the other.
Second, all that configuration does is dump the messages into a queue channel inputFromKafka
- there is no connection to your "consumer". Change the bean name and add a service activator (and remove the <int:queue/>
element from the channel).
<int:channel id="inputFromKafka" />
<int:service-activator ref="myConsumer" input-channel="inputFromKafka" />
Will route the messages from the adapter to your consumer bean method (assuming the payload can be converted to your required type).
Upvotes: 1