Kirkland
Kirkland

Reputation: 843

How do I Connect Spring Cloud Stream Functional Beans to a Kafka Binder?

I'm using the Spring Cloud Streams Documentation to try and work out how to connect my micro service to Kafka via the binder already downloaded in Gradle. I've tried creating a simple @Bean Function<String, String>() method within my Spring Boot Application class and have verified that it is able to talk to Kafka by using the command line to interact with the uppercase-in-0 and uppercase-out-0 topics, as is described in the beginning of the documentation confirming that the application is able to communicate with Kafka. At this point I attempted to create the following class with the expectation that it would load via auto discovery:

package com.yuknis.loggingconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

}

package com.yuknis.loggingconsumer.functions;

import java.util.function.Function;

public class CharCounter implements Function<String, Integer> {

    /**
     * Applies this function to the given argument.
     *
     * @param s the function argument
     * @return the function result
     */
    @Override
    public Integer apply(String s) {
        return s.length();
    }

}

With the application.properties files as such:

spring.cloud.function.scan.packages:com.yuknis.loggingconsumer.functions

I'm not 100% sure what should happen now, but I'm assuming that it should see the class and automatically create a charcounter-out-0 and charcounter-in-0 topic which I could consume and publish to, with the data in those topics going through that function. That's isn't what's happening. What might I be missing? Is this class supposed to create the topic the same way that the @Bean would?

Upvotes: 0

Views: 345

Answers (1)

Kirkland
Kirkland

Reputation: 843

Even though each of the functions are loaded with spring.cloud.function.scan.packages set to a package and spring.cloud.function.scan.enabled set to true, it still doesn't create the topics. You'll still need to set spring.cloud.function.scan.definition to the Function, Consumer, or Supplier you'd like to have communicate with Kafka like so:

spring.cloud.function:
  scan:
    enabled: true
    packages: com.yuknis.loggingconsumer.functions
  definition: charCounter;lowercase;uppercase

After that, it will create the charCounter-in-0 and charCounter-out-0 topics, which can be mapped if necessary with the spring.cloud.function.charCounter-in-0 or spring.cloud.function.charCounter-out-0 expression property.

Upvotes: 1

Related Questions