t0il3ts0ap
t0il3ts0ap

Reputation: 558

How to configure GlobalChannelInterceptor for spring-cloud-stream?

I have a spring-cloud-stream worker which is using kafka bindings

@Slf4j
@EnableBinding(KafkaStreamsProcessor.class)
@RequiredArgsConstructor
public class SomeWorker {

    @StreamListener(Sink.INPUT)
    @SendTo(Source.OUTPUT)
    public KStream<?, Obj> process(KStream<?, Obj> objStream) {
        return objStream.something();
    }
}

And a global-interceptor

@Component
@Slf4j
@GlobalChannelInterceptor
public class StreamInterceptor implements ChannelInterceptor {

    @Override
    public Message<?> preSend(Message<?> msg, MessageChannel mc) {
        log.info("In preSend");
        return msg;
    }

    @Override
    public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
        log.info("In postSend");
    }

    @Override
    public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
        log.info("In afterSendCompletion");
    }

    @Override
    public boolean preReceive(MessageChannel mc) {
        log.info("In preReceive");
        return true;
    }

    @Override
    public Message<?> postReceive(Message<?> msg, MessageChannel mc) {
        log.info("In postReceive");
        return msg;
    }

}

On receiving any message on stream, the GlobalChannelInterceptor is not called.

What am I missing ?

ps: I am following this test https://github.com/spring-cloud/spring-cloud-stream/blob/master/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/interceptor/BoundChannelsInterceptedTest.java#L67

Upvotes: 2

Views: 2001

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

The Kafka Streams binder is not based on MessageChannels so there is no channel to intercept.

Upvotes: 1

Related Questions