Reputation: 11
I have a Spring cloud stream application implemented in Functional approach. the app consumes events from multiple Kafka topics, normalizes the input into output schema (always same schema) and publishes to Kafka. I am not using Kafka-streams since no join/enrichment/state is required.
I want to allow flexible deployment by controlling the input topics to consume from at runtime: you can either consume from all topics or from single topic. my way to do it was to declare dedicated function for each type, and a dedicated binding for each function. The problem is that the binder (there is a single one) routes all incoming messages to all bindings, and I get ClassCastException when the wrong function is called to handle some event type.
I thought of the following solutions, yet I want to know if there is a better way:
my application.yaml looks like this:
spring:
cloud:
function:
definition: data;more
stream:
default-binder: kafka-string-avro
bindings:
data-in-0:
binder: kafka-string-avro
destination: data.emails.events
group: communication_system_events_data_gp
data-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
more-in-0:
binder: kafka-string-avro
destination: communication.emails.send.status
group: communication_system_events_more_gp
more-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
my functions:
@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
return new MoreFunction();
}
Upvotes: 0
Views: 818
Reputation: 5914
Not sure where the issue is, but I am seeing some configuration issues with what you provided. It might be a typo when you copied to the question, but the following config should isolate the two different topics to their corresponding functions.
spring:
cloud:
function:
definition: dataFunction;moreFunction
stream:
default-binder: kafka-string-avro
bindings:
dataFunction-in-0:
binder: kafka-string-avro
destination: data.emails.events
group: communication_system_events_data_gp
dataFunction-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
moreFunction-in-0:
binder: kafka-string-avro
destination: communication.emails.send.status
group: communication_system_events_more_gp
moreFunction-out-0:
binder: kafka-string-avro
destination: communication.system.emails.events
producer:
useNativeEncoding: true
@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
return new MoreFunction();
}
Upvotes: 2