Reputation: 558
I have a spring-cloud-stream worker which is using kafka bindings
@Slf4j
@EnableBinding(KafkaStreamsProcessor.class)
@RequiredArgsConstructor
public class SomeWorker {
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public KStream<?, Obj> process(KStream<?, Obj> objStream) {
return objStream.something();
}
}
And a global-interceptor
@Component
@Slf4j
@GlobalChannelInterceptor
public class StreamInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> msg, MessageChannel mc) {
log.info("In preSend");
return msg;
}
@Override
public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
log.info("In postSend");
}
@Override
public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
log.info("In afterSendCompletion");
}
@Override
public boolean preReceive(MessageChannel mc) {
log.info("In preReceive");
return true;
}
@Override
public Message<?> postReceive(Message<?> msg, MessageChannel mc) {
log.info("In postReceive");
return msg;
}
}
On receiving any message on stream, the GlobalChannelInterceptor is not called.
What am I missing ?
ps: I am following this test https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/interceptor/BoundChannelsInterceptedTest.java#L67
Upvotes: 2
Views: 2001
Reputation: 174729
The Kafka Streams binder is not based on MessageChannel
s so there is no channel to intercept.
Upvotes: 1