Nishant Raj
Nishant Raj

Reputation: 76

Kafka-spring consumer is not reading message

I am sending message on topics and can read same message using kafka console and Java consumer code using main method. But message are not getting read from consumer configuration xml. Please help to resolve this issue. I am using message int-kafka:message-driven-channel-adapter. spring-kafka version 1.3.0.RELEASE

         <?xml version="1.0" encoding="UTF-8"?>
        <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
    http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka-1.0.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.0.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">


           <int:channel id="inputFromKafka">
            <int:queue />
           </int:channel>


           <bean id="zkConfiguration"  class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
    <constructor-arg ref="zookeeperConnect" />
</bean>

<bean id="kafkaConnectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory" >
    <constructor-arg ref="zkConfiguration" />
</bean>

  <bean id="decoder"
class="org.springframework.integration.kafka.serializer.common.StringDecoder" />
<int-kafka:message-driven-channel-adapter id="messageProcessor" 
    channel="inputFromKafka"
    connection-factory="kafkaConnectionFactory"
    queue-size="${queue.size:1024}"
        concurrency="${concurrency:50}"
    auto-startup="true"
    payload-decoder="decoder"
    topics="${topics:nishant}" key-decoder="decoder" />
     <int-kafka:zookeeper-connect id="zookeeperConnect" 
  zk-connect="localhost:2181" zk-connection-timeout="6000"
    zk-session-timeout="6000" zk-sync-time="2000" />


         </beans>

Consumer Java code

              @Component(value = "messageProcessor")
              public class MessageConsumer {
            public void reader(Map<String, Map<Integer, String>> payload) {
            System.out.println("TEST");

             }
        }

Upvotes: 1

Views: 1392

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

There are several problems with your configuration.

First your "consumer" and channel adapter have the same bean name (id) messageProcessor. With Spring, the last one wins so one definition will overwrite the other.

Second, all that configuration does is dump the messages into a queue channel inputFromKafka - there is no connection to your "consumer". Change the bean name and add a service activator (and remove the <int:queue/> element from the channel).

<int:channel id="inputFromKafka" />

<int:service-activator ref="myConsumer" input-channel="inputFromKafka" />

Will route the messages from the adapter to your consumer bean method (assuming the payload can be converted to your required type).

Upvotes: 1

Related Questions