Yosi Bronsberg
Yosi Bronsberg

Reputation: 11

Spring cloud stream: Attaching function to binder by type

I have a Spring cloud stream application implemented in Functional approach. the app consumes events from multiple Kafka topics, normalizes the input into output schema (always same schema) and publishes to Kafka. I am not using Kafka-streams since no join/enrichment/state is required.

I want to allow flexible deployment by controlling the input topics to consume from at runtime: you can either consume from all topics or from single topic. my way to do it was to declare dedicated function for each type, and a dedicated binding for each function. The problem is that the binder (there is a single one) routes all incoming messages to all bindings, and I get ClassCastException when the wrong function is called to handle some event type.

I thought of the following solutions, yet I want to know if there is a better way:

  1. having a single binder per binding. I rather not to, especially since I'm using a well-configured binder and I don't want to simply duplicate it.
  2. having a single binder and a single function of type Message<?>, that internally checks the object type, cast it and handles it by type.

my application.yaml looks like this:

spring:
cloud:
    function:
        definition: data;more
    stream:
        default-binder: kafka-string-avro
        bindings:
            data-in-0:
                binder: kafka-string-avro
                destination: data.emails.events
                group: communication_system_events_data_gp
            data-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true
            more-in-0:
                binder: kafka-string-avro
                destination: communication.emails.send.status
                group: communication_system_events_more_gp
            more-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true

my functions:

@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
    return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
    return new MoreFunction();
}

Upvotes: 0

Views: 818

Answers (1)

sobychacko
sobychacko

Reputation: 5914

Not sure where the issue is, but I am seeing some configuration issues with what you provided. It might be a typo when you copied to the question, but the following config should isolate the two different topics to their corresponding functions.

spring:
cloud:
    function:
        definition: dataFunction;moreFunction
    stream:
        default-binder: kafka-string-avro
        bindings:
            dataFunction-in-0:
                binder: kafka-string-avro
                destination: data.emails.events
                group: communication_system_events_data_gp
            dataFunction-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true
            moreFunction-in-0:
                binder: kafka-string-avro
                destination: communication.emails.send.status
                group: communication_system_events_more_gp
            moreFunction-out-0:
                binder: kafka-string-avro
                destination: communication.system.emails.events
                producer:
                    useNativeEncoding: true
@Bean("data")
public Function<Message<Data>, Message<Output>> dataFunction() {
    return new DataFunction();
}
@Bean("more")
public Function<Message<More>, Message<Output>> moreFunction() {
    return new MoreFunction();
}

Upvotes: 2

Related Questions