Reputation: 33
The documentation for ReplyingKafkaTemplate which provides Request-Reply support (introduced in Spring-Kafka 2.1.3) suggests that different types may be used for the Request and Reply:
ReplyingKafkaTemplate<K, V, R>
where the parameterised type K designates the message Key, V designates the Value (i.e the Request), and R designates the Reply.
So good so far. But the corresponding supporting classes for implementing the server side Request-Reply doesn't seem to support different types for V, R. The documentation suggests using a KafkaListener with an added @SendTo annotation, which behind the scene uses a configured replyTemplate on the MessageListenerContainer. But the AbstractKafkaListenerEndpoint only supports a single type for the listener as well as the replyTemplate:
public abstract class AbstractKafkaListenerEndpoint<K, V>
implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean {
...
/**
* Set the {@link KafkaTemplate} to use to send replies.
* @param replyTemplate the template.
* @since 2.0
*/
public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) {
this.replyTemplate = replyTemplate;
}
...
}
hence V and R needs to be the same type.
The example used in the documentation indeed uses String for both Request and Reply.
Am I missing something, or is this a design flaw in the Spring-Kafka Request-Reply support that should be reported and corrected?
Upvotes: 3
Views: 3581
Reputation: 174789
This is fixed in the 2.2 release.
For earlier versions, simply inject a raw KafkaTemplate
(with no generics).
EDIT
@SpringBootApplication
public class So53151961Application {
public static void main(String[] args) {
SpringApplication.run(So53151961Application.class, args);
}
@KafkaListener(id = "so53151961", topics = "so53151961")
@SendTo
public Bar handle(Foo foo) {
System.out.println(foo);
return new Bar(foo.getValue().toUpperCase());
}
@Bean
public ReplyingKafkaTemplate<String, Foo, Bar> replyingTemplate(ProducerFactory<String, Foo> pf,
ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {
ConcurrentMessageListenerContainer<String, Bar> replyContainer =
factory.createContainer("so53151961-replyTopic");
replyContainer.getContainerProperties().setGroupId("so53151961.reply");
ReplyingKafkaTemplate<String, Foo, Bar> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(pf, replyContainer);
return replyingKafkaTemplate;
}
@Bean
public KafkaTemplate<String, Bar> replyTemplate(ProducerFactory<String, Bar> pf,
ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {
KafkaTemplate<String, Bar> kafkaTemplate = new KafkaTemplate<>(pf);
factory.setReplyTemplate(kafkaTemplate);
return kafkaTemplate;
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, Foo, Bar> template) {
return args -> {
ProducerRecord<String, Foo> record = new ProducerRecord<>("so53151961", null, "key", new Foo("foo"));
RequestReplyFuture<String, Foo, Bar> future = template.sendAndReceive(record);
System.out.println(future.get(10, TimeUnit.SECONDS).value());
};
}
@Bean
public NewTopic topic() {
return new NewTopic("so53151961", 1, (short) 1);
}
@Bean
public NewTopic reply() {
return new NewTopic("so53151961-replyTopic", 1, (short) 1);
}
public static class Foo {
public String value;
public Foo() {
super();
}
public Foo(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "Foo [value=" + this.value + "]";
}
}
public static class Bar {
public String value;
public Bar() {
super();
}
public Bar(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
public void setValue(String value) {
this.value = value;
}
@Override
public String toString() {
return "Bar [value=" + this.value + "]";
}
}
}
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example
result
Foo [value=foo]
Bar [value=FOO]
Upvotes: 1