Reputation: 3898
Using Spring-Integration-Kafka, With outbound-channel-adapter I am trying to send messages to a topic with name "test"
Through command line terminal, I started zookeeper, kafka and created topic with name "test"
Spring XML configuration
<int:publish-subscribe-channel id="inputToKafka" />
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
auto-startup="false"
channel="inputToKafka"
kafka-template="template"
sync="true"
topic="test">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
JUnit Test Code
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {
"classpath:kafka-outbound-context.xml"
})
public class ProducerTest{
@Autowired
@Qualifier("inputToKafka")
MessageChannel channel;
@Test
public void test_send_message() {
channel.send(MessageBuilder.withPayload("Test Message")
.setHeader(KafkaHeaders.TOPIC, "test").build());
}
}
The test case succeeds and on debug i find channel.send() returns true
I inspect the topic through command line with below command, but I don't see any message in the test topic.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Can somebody why I don't see any messages on my test topic ?
Upvotes: 0
Views: 3005
Reputation: 174494
Have you looked in the logs? You need to configure key and value serializers, otherwise you'll get
Caused by: org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.
When using java:
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
The map keys are key.serializer
and value.serializer
.
Upvotes: 1