Reputation: 21
I am currently using Spring Cloud Stream with Kafka binders with a GlobalChannelInterceptor
to perform message-logging for my Spring Boot microservices.
I have:
SubscribableChannel
@StreamListener
annotation)Throughout the process when a message is published to the Stream from the producer and listened by the consumer, it is observed that the preSend
method was triggered twice:
However, for my logging purposes, I only need to intercept and log the message at consumer side.
Is there any way to intercept the SCS message ONLY at one side (e.g. consumer side)?
I would appreciate any thoughts on this matter. Thank you!
Ref:
GlobalChannelInterceptor
documentation - https://docs.spring.io/spring-integration/api/org/springframework/integration/config/GlobalChannelInterceptor.htmlEDIT
Producer
public void sendToPushStream(PushStreamMessage message) {
try {
boolean results = streamChannel.pushStream().send(MessageBuilder.withPayload(new ObjectMapper().writeValueAsString(message)).build());
log.info("Push stream message {} sent to {}.", results ? "successfully" : "not", StreamChannel.PUSH_STREAM);
} catch (JsonProcessingException ex) {
log.error("Unable to parse push stream message.", ex);
}
}
Producer's streamChannel
public interface StreamChannel {
String PUSH_STREAM = "PushStream";
@Output(StreamChannel.PUSH_STREAM)
SubscribableChannel pushStream();
}
Consumer
@StreamListener(StreamChannel.PUSH_STREAM)
public void handle(Message<PushStreamMessage> message) {
log.info("Incoming stream message from {}, {}", streamChannel.pushStream(), message);
}
Consumer's streamChannel
public interface StreamChannel {
String PUSH_STREAM = "PushStream";
@Input(StreamChannel.PUSH_STREAM)
SubscribableChannel pushStream();
}
Interceptor (Common Library)
public class GlobalStreamInterceptor extends ChannelInterceptorAdapter {
@Override
public Message<?> preSend(Message<?> msg, MessageChannel mc) {
log.info("presend " + msg);
return msg;
}
@Override
public void postSend(Message<?> msg, MessageChannel mc, boolean sent) {
log.info("postSend " + msg);
}
}
Upvotes: 0
Views: 2469
Reputation: 121272
Right, why don't follow GlobalChannelInterceptor
options and don't apply
An array of simple patterns against which channel names will be matched.
?
So, you may have something like this:
@GlobalChannelInterceptor(patterns = Processor.INPUT)
Or use a custom name of input channel to your SCSt app.
Upvotes: 1