Reputation: 45
I am new to Spring Cloud and kafka stream. I am trying to setup spring cloud application with kafka binder. I tried testing kafka stream processor locally but I am unable to print any logs.
My kafka message will contain JSONObject. kafkaStreamListener class is :
@Configuration
public class KafkaStreamListener {
private static Logger logger = LogManager.getLogger(KafkaStreamListener.class);
//bean for processing autonomous messages
@Bean
public Function<KStream<String, JSONObject>, KStream<String, JSONObject>> autonomousProcessor() {
System.out.println("start of stream processor%%%%%%%%%%%%%%%%%%%%%**************************");
logger.info("inside processor");
return kstream -> kstream.filter((key,value) -> {
System.out.println(value.toString());
return true;});
}
Application.properties :
#Processor group with inputs and outputs
spring.cloud.stream.function.definition = autonomousProcessor
spring.cloud.stream.bindings.autonomousProcessor-in-0.destination = INPUT_TOPIC
spring.cloud.stream.bindings.autonomousProcessor-out-0.destination = OUTPUT_TOPIC
spring.cloud.stream.kafka.streams.binder.functions.autonomousProcessor.application-id= autonomousProcessorGroup
Issue : In Debug mode, breakpoint reaches filter step directly and than no action. It skips the logger and SOP. Not sure what maybe the issue . Spring cloud version : Hoxton.SR11
Upvotes: 0
Views: 471
Reputation: 5924
I think what you are seeing is the right behavior. Your function will be invoked by the binder at the bootstrap time only once and then the initial SOP and logger will be called (again only once). You will see them get called if you set a breakpoint on the first SOP or logger when starting the app. Then when the Kafka topic receives data, the provided lambda (with filter) will be called. The internal SOP in your filter should log value.toString()
on each invocation of the filer per record.
Upvotes: 1