Reputation: 51
We have multiple applications consumer listening to the same kafka topic and a producer sets the message header when sending message to the topic so specific instance can evaluate the header and process the message. eg
@StreamListener(target=ITestSink.CHANNEL_NAME,condition="headers['franchiseName'] == 'sydney'")
public void fullfillOrder(@Payload TestObj message) {
log.info("sydney order request received message is {}",message.getName());
}
In Spring Cloud Stream 3.0.0 the @StreamListener is deprecated and I could not find the equivalent of the condition property in Function.
Any suggestion?
Upvotes: 5
Views: 7387
Reputation: 200
Though I was not able to find the equivalent for the functional approach either, I do have a suggestion.
The @StreamListener
annotations condition does not stop the fact that the application must consume the message, read its header, and filter out specific records before passing it to the listener (fullfillOrder()
). So it's safe to assume you're consuming every message that hits the topic regardless (by the event receiver that Spring Cloud has implemented for us under the hood), but the listener only gets executed when header == sydney.
If there was a way to configure the event receiver that Spring Cloud uses (to discard message before hitting listener), I would suggest looking into that. If not, would resort to filtering out any messages (non-sydney) before doing any processing. If you're familiar with Spring Cloud's functional approach, would look something like this:
@Bean
public Consumer<Message<TestObj>> fulfillOrder() {
return msg -> {
// to get header - msg.getHeaders().get(key, valueType);
// filter out bad messages
}
}
or
@Bean
public Consumer<ConsumerRecord<?, TestObj>> fulfillOrder() {
return msg -> {
// msg.headers().lastHeader("franchiseName").value() -> filter em out
}
}
Other:
^ my code assumes you're integrating the kafka-client API with Spring cloud stream via spring-cloud-stream-binder-kafka
. based on tags listed, i will note Spring Cloud Stream has two versions of binders for Kafka - one for the kafka client library, and one for kafka streams library.
Without considering Spring Cloud / Frameworks, the high-lvl DSL in kafka streams doesn't give you access to headers, but the low-level Processor API does. From the example, it seems like you're leveraging the client binder and not spring-cloud-stream-binder-kafka-streams
/ kafka streams binder. I haven't seen an implementation of spring cloud stream + kafka streams binder using the low-level processor API, so i can't tell if that was the aim.
Upvotes: 1