Reputation: 12022
Spring allows interception of messages for many of their products, like RestTemplate and SpringMVC. Is it possible to intercept Spring Cloud Stream messages? For both incoming and outgoing messages.
Upvotes: 6
Views: 7910
Reputation: 1285
I needed to Collect all the messages sent in an IT Test, so that I can assert some expected message types, here is my solution
@TestConfiguration
public static class TstMessageCaptureConfig {
final List<Message<byte[]>> allMessagesSent = new ArrayList<>();
@Bean
ChannelInterceptor captureChannelInterceptor() {
ChannelInterceptor assertionInterceptor =
new ChannelInterceptor() {
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
Message<byte[]> result = (Message<byte[]>) message;
String msgPayload = new String(result.getPayload());
allMessagesSent.add(result);
System.out.println("MessageChannel EEEE>>" + channel);
System.out.println("Msg Captured EEEE>>" + msgPayload);
}
};
return assertionInterceptor;
}
@Bean
public GlobalChannelInterceptorWrapper channelInterceptorWrapper(ChannelInterceptor captureChannelInterceptor) {
GlobalChannelInterceptorWrapper globalChannelInterceptorWrapper = new GlobalChannelInterceptorWrapper(captureChannelInterceptor);
return globalChannelInterceptorWrapper;
}
}
Upvotes: 0
Reputation: 12022
Was able to intercept inbound and outbound Spring Cloud Stream messages using the GlobalChannelInterceptor
annotation and ChannelInterceptor
interface. See sample below.
import org.springframework.integration.config.GlobalChannelInterceptor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;
@Component
@GlobalChannelInterceptor
public class Interceptor implements ChannelInterceptor {
private final Logger log = LoggerFactory.getLogger(Interceptor.class);
@Override
public Message<?> preSend(Message<?> msg, MessageChannel mc) {
log.info("In preSend");
return msg;
}
@Override
public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
log.info("In postSend");
}
@Override
public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
log.info("In afterSendCompletion");
}
@Override
public boolean preReceive(MessageChannel mc) {
log.info("In preReceive");
return true;
}
@Override
public Message<?> postReceive(Message<?> msg, MessageChannel mc) {
log.info("In postReceive");
return msg;
}
@Override
public void afterReceiveCompletion(Message<?> msg, MessageChannel mc, Exception excptn) {
log.info("In afterReceiveCompletion");
}
}
Upvotes: 14
Reputation: 2410
Not sure what you mean by interception here - both examples you give are not message-based :).
But you want to get access to the full message, you can use that as argument to a @StreamListener
or @ServiceActivator
-annotated method. Also, Spring Cloud Stream allows you to set up a full Spring Integration pipeline, so you can add advices and everything you need - see here: https://github.com/spring-projects/spring-integration-java-dsl/wiki/Spring-Integration-Java-DSL-Reference.
I would encourage you to take a look at the Spring Integration reference as well http://docs.spring.io/autorepo/docs/spring-integration/4.2.6.RELEASE/reference/html/. Spring Cloud Stream injects the channels automatically, and from there you have full freedom on how you construct your pipeline.
Hope this helps, Marius
Upvotes: 0