Reputation: 41
I would like to have a client app with request/response semantics which invokes another app that's a Kafka Streams app.
My client app is based on this example (essentially unchanged). I need the app receiving the messages from the client to be a Kafka Streams app. But the message headers including the correlation id are lost.
The Kafka Streams app is a simple topology for testing this...
@Bean
public KafkaStreams stream(KafkaStreamsConfiguration kafkaStreamsConfiguration) {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(REQUEST_TOPIC_NAME)
.groupByKey()
.count()
.toStream()
.mapValues((ValueMapper<Long, String>)String::valueOf)
.to(REPLY_TOPIC_NAME);
return new KafkaStreams(builder.build(), kafkaStreamsConfiguration.asProperties());
}
For this POC I'm keeping it simple and having the client and server "agree" on the topic names (kRequests
and kReplies
). So at this point I just want to get the correlation id to be recognized and returned.
What I'm seeing now is
2019-10-01 10:55:38.792 WARN 76830 --- [TaskScheduler-1] o.s.k.r.ReplyingKafkaTemplate : Reply timed out for: ProducerRecord(topic=kRequests, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = kafka_replyTopic, value = [107, 82, 101, 112, 108, 105, 101, 115]), RecordHeader(key = kafka_correlationId, value = [101, -4, -35, 41, -127, -66, 69, 37, -117, -127, -95, -92, 38, 79, 73, 127])], isReadOnly = true), key=null, value=foo21074, timestamp=null) with correlationId: [135564972083657938538225367552235620735]
2019-10-01 10:55:38.792 ERROR 76830 --- [TaskScheduler-1] org.KRequestingApplication : Reply timed out
org.springframework.kafka.KafkaException: Reply timed out
at org.springframework.kafka.requestreply.ReplyingKafkaTemplate.lambda$scheduleTimeout$0(ReplyingKafkaTemplate.java:257) ~[spring-kafka-2.2.8.RELEASE.jar:2.2.8.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_211]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_211]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_211]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_211]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]
There is no message with the matching correlation id on the reply topic within the timeout. It seems that at least using Kafka Streams DSL there's no way to support the ReplyingKafkaTemplate
.
Upvotes: 0
Views: 1596
Reputation: 174494
Your scenario doesn't make sense; your KStream is grouping multiple inputs; request/reply is 1 request 1 reply.
This works fine...
@SpringBootApplication
@EnableKafkaStreams
public class So58193901Application {
private static final String REQUEST_TOPIC_NAME = "requests";
private static final String REPLY_TOPIC_NAME = "replies";
public static void main(String[] args) {
SpringApplication.run(So58193901Application.class, args);
}
@Bean
public KStream<byte[], byte[]> stream(StreamsBuilder builder) {
KStream<byte[], byte[]> stream = builder.stream(REQUEST_TOPIC_NAME);
stream
.mapValues(val -> new String(val).toUpperCase().getBytes())
.to(REPLY_TOPIC_NAME);
return stream;
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name(REQUEST_TOPIC_NAME).partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name(REPLY_TOPIC_NAME).partitions(1).replicas(1).build();
}
@Bean
public ReplyingKafkaTemplate<String, String, String> template(ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer = factory.createContainer(REPLY_TOPIC_NAME);
return new ReplyingKafkaTemplate<>(pf, replyContainer);
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
System.out.println(template.sendAndReceive(new ProducerRecord<>(REQUEST_TOPIC_NAME, "bar", "foo"))
.get(10, TimeUnit.SECONDS).value());
};
}
}
Upvotes: 1