Reputation: 2222
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model shows an example where the input topic can be set using the property spring.cloud.stream.bindings.process_in.destination
.
Now I want to use dependency injection, e.g.
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(JavaMailSender mailSender) {...}
When starting the application (based on Spring Boot) the property spring.cloud.stream.bindings.process_in.destination
is ignored, and instead the input topic input
is subscribed.
EDIT: Here is the Kotlin code (without imports)
Mailer.kt:
@Configuration
class Mailer {
@Bean
fun sendMail(/*mailSender: JavaMailSender*/) = Consumer<KStream<Any, Mail>> { input ->
input.foreach { _, mail -> println("mail = $mail") }
}
}
Mail.kt:
data class Mail(var from: String = "", var to: String = "", var subject: String = "", var body: String = "")
Application.kt:
@SpringBootApplication
class Application
fun main(args: Array<String>) {
runApplication<Application>(*args) {
}
}
application.yml::
spring.cloud.stream:
bindings.sendMail_in.destination: mail
kafka.binder.configuration.listeners: PLAINTEXT://localhost:9092
Upvotes: 0
Views: 768
Reputation: 5914
There were a few issues in the binder that didn't properly autowire
the beans provided to a function/consumer bean
. Latest snapshot solves those problems though. Please make sure that you are using the latest snapshot (3.0.0.BUILD-SNAPSHOT
). Here is a sample application that works with the same scenario that you provided.
Upvotes: 1