santankrish
santankrish

Reputation: 21

Intercepting Spring Cloud Stream Messages from Consumer only

I am currently using Spring Cloud Stream with Kafka binders with a GlobalChannelInterceptor to perform message-logging for my Spring Boot microservices.

I have:

  1. a producer to publish messages to a SubscribableChannel
  2. a consumer to listen from the Stream (using the @StreamListener annotation)

Throughout the process when a message is published to the Stream from the producer and listened by the consumer, it is observed that the preSend method was triggered twice:

  1. Once at producer side - when the message is published to the Stream
  2. Once at consumer side - when the message is listened from the Stream

However, for my logging purposes, I only need to intercept and log the message at consumer side.

Is there any way to intercept the SCS message ONLY at one side (e.g. consumer side)?

I would appreciate any thoughts on this matter. Thank you!

Ref:

  1. GlobalChannelInterceptor documentation - https://docs.spring.io/spring-integration/api/org/springframework/integration/config/GlobalChannelInterceptor.html

EDIT

Producer

public void sendToPushStream(PushStreamMessage message) {
        try {
            boolean results = streamChannel.pushStream().send(MessageBuilder.withPayload(new ObjectMapper().writeValueAsString(message)).build());
        log.info("Push stream message {} sent to {}.", results ? "successfully" : "not", StreamChannel.PUSH_STREAM);
        } catch (JsonProcessingException ex) {
            log.error("Unable to parse push stream message.", ex);
        }
    }

Producer's streamChannel

public interface StreamChannel {

    String PUSH_STREAM = "PushStream";

    @Output(StreamChannel.PUSH_STREAM)
    SubscribableChannel pushStream();

}

Consumer

@StreamListener(StreamChannel.PUSH_STREAM)
public void handle(Message<PushStreamMessage> message) {
    log.info("Incoming stream message from {}, {}", streamChannel.pushStream(), message);

}

Consumer's streamChannel

public interface StreamChannel {

    String PUSH_STREAM = "PushStream";

    @Input(StreamChannel.PUSH_STREAM)
    SubscribableChannel pushStream();

}

Interceptor (Common Library)

public class GlobalStreamInterceptor extends ChannelInterceptorAdapter {

    @Override
    public Message<?> preSend(Message<?> msg, MessageChannel mc) {
       log.info("presend " + msg);
        return msg;
    }

    @Override
    public void postSend(Message<?> msg, MessageChannel mc, boolean sent) {
        log.info("postSend " + msg);
    }

}

Upvotes: 0

Views: 2469

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121272

Right, why don't follow GlobalChannelInterceptor options and don't apply

An array of simple patterns against which channel names will be matched.

?

So, you may have something like this:

@GlobalChannelInterceptor(patterns = Processor.INPUT)

Or use a custom name of input channel to your SCSt app.

Upvotes: 1

Related Questions