Björn Beskow
Björn Beskow

Reputation: 33

spring-kafka Request Reply: Different Types for Request and Reply

The documentation for ReplyingKafkaTemplate which provides Request-Reply support (introduced in Spring-Kafka 2.1.3) suggests that different types may be used for the Request and Reply:

ReplyingKafkaTemplate<K, V, R>

where the parameterised type K designates the message Key, V designates the Value (i.e the Request), and R designates the Reply.

So good so far. But the corresponding supporting classes for implementing the server side Request-Reply doesn't seem to support different types for V, R. The documentation suggests using a KafkaListener with an added @SendTo annotation, which behind the scene uses a configured replyTemplate on the MessageListenerContainer. But the AbstractKafkaListenerEndpoint only supports a single type for the listener as well as the replyTemplate:

public abstract class AbstractKafkaListenerEndpoint<K, V>
        implements KafkaListenerEndpoint, BeanFactoryAware, InitializingBean {

    ...

    /**
     * Set the {@link KafkaTemplate} to use to send replies.
     * @param replyTemplate the template.
     * @since 2.0
     */
    public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) {
        this.replyTemplate = replyTemplate;
    }

    ...

}

hence V and R needs to be the same type.

The example used in the documentation indeed uses String for both Request and Reply.

Am I missing something, or is this a design flaw in the Spring-Kafka Request-Reply support that should be reported and corrected?

Upvotes: 3

Views: 3581

Answers (1)

Gary Russell
Gary Russell

Reputation: 174789

This is fixed in the 2.2 release.

For earlier versions, simply inject a raw KafkaTemplate (with no generics).

EDIT

@SpringBootApplication
public class So53151961Application {

    public static void main(String[] args) {
        SpringApplication.run(So53151961Application.class, args);
    }

    @KafkaListener(id = "so53151961", topics = "so53151961")
    @SendTo
    public Bar handle(Foo foo) {
        System.out.println(foo);
        return new Bar(foo.getValue().toUpperCase());
    }

    @Bean
    public ReplyingKafkaTemplate<String, Foo, Bar> replyingTemplate(ProducerFactory<String, Foo> pf,
            ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {

        ConcurrentMessageListenerContainer<String, Bar> replyContainer =
                factory.createContainer("so53151961-replyTopic");
        replyContainer.getContainerProperties().setGroupId("so53151961.reply");
        ReplyingKafkaTemplate<String, Foo, Bar> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(pf, replyContainer);
        return replyingKafkaTemplate;
    }

    @Bean
    public KafkaTemplate<String, Bar> replyTemplate(ProducerFactory<String, Bar> pf,
            ConcurrentKafkaListenerContainerFactory<String, Bar> factory) {

        KafkaTemplate<String, Bar> kafkaTemplate = new KafkaTemplate<>(pf);
        factory.setReplyTemplate(kafkaTemplate);
        return kafkaTemplate;
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, Foo, Bar> template) {
        return args -> {
            ProducerRecord<String, Foo> record = new ProducerRecord<>("so53151961", null, "key", new Foo("foo"));
            RequestReplyFuture<String, Foo, Bar> future = template.sendAndReceive(record);
            System.out.println(future.get(10, TimeUnit.SECONDS).value());
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so53151961", 1, (short) 1);
    }

    @Bean
    public NewTopic reply() {
        return new NewTopic("so53151961-replyTopic", 1, (short) 1);
    }

    public static class Foo {

        public String value;

        public Foo() {
            super();
        }

        public Foo(String value) {
            this.value = value;
        }

        public String getValue() {
            return this.value;
        }

        public void setValue(String value) {
            this.value = value;
        }

        @Override
        public String toString() {
            return "Foo [value=" + this.value + "]";
        }

    }

    public static class Bar {

        public String value;

        public Bar() {
            super();
        }

        public Bar(String value) {
            this.value = value;
        }

        public String getValue() {
            return this.value;
        }

        public void setValue(String value) {
            this.value = value;
        }

        @Override
        public String toString() {
            return "Bar [value=" + this.value + "]";
        }

    }

}
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example

result

Foo [value=foo]
Bar [value=FOO]

Upvotes: 1

Related Questions