RayS
RayS

Reputation: 41

Using Spring Kafka ReplyingKafkaTemplate with Kafka Streams app

I would like to have a client app with request/response semantics which invokes another app that's a Kafka Streams app.

My client app is based on this example (essentially unchanged). I need the app receiving the messages from the client to be a Kafka Streams app. But the message headers including the correlation id are lost.

The Kafka Streams app is a simple topology for testing this...

    @Bean
    public KafkaStreams stream(KafkaStreamsConfiguration kafkaStreamsConfiguration) {
        final StreamsBuilder builder = new StreamsBuilder();
        builder.<String, String>stream(REQUEST_TOPIC_NAME)
                .groupByKey()
                .count()
                .toStream()
                .mapValues((ValueMapper<Long, String>)String::valueOf)
                .to(REPLY_TOPIC_NAME);

        return new KafkaStreams(builder.build(), kafkaStreamsConfiguration.asProperties());
    }

For this POC I'm keeping it simple and having the client and server "agree" on the topic names (kRequests and kReplies). So at this point I just want to get the correlation id to be recognized and returned.

What I'm seeing now is

2019-10-01 10:55:38.792  WARN 76830 --- [TaskScheduler-1] o.s.k.r.ReplyingKafkaTemplate            : Reply timed out for: ProducerRecord(topic=kRequests, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [107, 82, 101, 112, 108, 105, 101, 115]), RecordHeader(key = kafka_correlationId, value = [101, -4, -35, 41, -127, -66, 69, 37, -117, -127, -95, -92, 38, 79, 73, 127])], isReadOnly = true), key=null, value=foo21074, timestamp=null) with correlationId: [135564972083657938538225367552235620735]
2019-10-01 10:55:38.792 ERROR 76830 --- [TaskScheduler-1] org.KRequestingApplication  : Reply timed out

org.springframework.kafka.KafkaException: Reply timed out
    at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$0(ReplyingKafkaTemplate.java:257) ~[spring-kafka-2.2.8.RELEASE.jar:2.2.8.RELEASE]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_211]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_211]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_211]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_211]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_211]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_211]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]

There is no message with the matching correlation id on the reply topic within the timeout. It seems that at least using Kafka Streams DSL there's no way to support the ReplyingKafkaTemplate.

Upvotes: 0

Views: 1596

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

Your scenario doesn't make sense; your KStream is grouping multiple inputs; request/reply is 1 request 1 reply.

This works fine...

@SpringBootApplication
@EnableKafkaStreams
public class So58193901Application {

    private static final String REQUEST_TOPIC_NAME = "requests";

    private static final String REPLY_TOPIC_NAME = "replies";

    public static void main(String[] args) {
        SpringApplication.run(So58193901Application.class, args);
    }

    @Bean
    public KStream<byte[], byte[]> stream(StreamsBuilder builder) {
        KStream<byte[], byte[]> stream = builder.stream(REQUEST_TOPIC_NAME);
        stream
                .mapValues(val -> new String(val).toUpperCase().getBytes())
                .to(REPLY_TOPIC_NAME);
        return stream;
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name(REQUEST_TOPIC_NAME).partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name(REPLY_TOPIC_NAME).partitions(1).replicas(1).build();
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> template(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, String> factory) {

        ConcurrentMessageListenerContainer<String, String> replyContainer = factory.createContainer(REPLY_TOPIC_NAME);
        return new ReplyingKafkaTemplate<>(pf, replyContainer);
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            System.out.println(template.sendAndReceive(new ProducerRecord<>(REQUEST_TOPIC_NAME, "bar", "foo"))
                    .get(10, TimeUnit.SECONDS).value());
        };
    }

}

Upvotes: 1

Related Questions