Reputation: 309
I'm trying to implement a retry mechanism for my kafka stream application. The idea is that I would get the consumer and partition ID as well as the topic name from the input topic and then pause the consumer for the duration stored in the payload.
I've searched for documentations and examples but all I found are examples based on the classic bindings provided by spring-cloud-stream. I'm trying to see if there's a way to get access to these info with functional style.
For example the following code can give me access to the consumer with classic binding style.
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
How do I get the equivalence with the Functional Style?
I tried with the following code but I'm getting exception saying no such binding is found.
@Bean
public Function<Message<?>, KStream<String, String>> process() {
message -> {
Consumer<?, ?> consumer = message.getHeaders().get(KafkaHeaders.Consumer, Consumer.class);
String topic = message.getHeaders().get(KafkaHeaders.Topic, String.class);
Integer partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
CustomPayload payload = (CustomPayload) message.getPayload();
if (payload.getRetryTime() < System.currentTimeMillis()) {
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
}
}
}
Exception I got
Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.springframework.messaging.Message among registered factories: channelFactory,messageSourceFactory,kStreamBoundElementFactory,kTableBoundElementFactory,globalKTableBoundElementFactory
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.getBindingTargetFactory(AbstractBindableProxyFactory.java:82)
at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.bindInput(KafkaStreamsBindableProxyFactory.java:191)
at org.springframework.cloud.stream.binder.kafka.streams.function.KafkaStreamsBindableProxyFactory.afterPropertiesSet(KafkaStreamsBindableProxyFactory.java:111)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1853)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1790)
... 96 more
Upvotes: 0
Views: 1262
Reputation: 5904
In your functional bean example, you are mixing both Message
and KStream
. That is the reason for that specific exception. The functional bean could be rewritten as below.
@Bean
public java.util.function.Consumer<Message<?>> process() {
return message -> {
Consumer<?, ?> consumer = message.getHeaders().get(KafkaHeaders.Consumer, Consumer.class);
String topic = message.getHeaders().get(KafkaHeaders.Topic, String.class);
Integer partitionId = message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class);
CustomPayload payload = (CustomPayload) message.getPayload();
if (payload.getRetryTime() < System.currentTimeMillis()) {
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)));
}
}
}
Upvotes: 1