Corenull
Corenull

Reputation: 23

Routing conditional with spring cloud streams functional

I have some issues after the old imperative programming type was deprecated. I have two microservices (one as publisher and the other as subscriber) and in the old way, with the annotation @StreamListener(target = "events", condition = "headers['type']=='consumerPermissionEvent'") i was able to have two functions listening only that records and now i don't know how to do it.

I was reading all the documentation event routing and trying with the routing-expression but the two consumers are reading all the records.

The application yaml of the first microservies:

spring:
  cloud:
    stream:
      bindings:
        output: 
          destination: topicEvents

The seconds application yaml is:

spring:
  cloud:
    function:
      routing-expression: headers['type']
      definition: consumerPermissionEvent;consumerApiEvent
    stream:
      bindings:
        consumerPermissionEvent-in-0:
          destination: topicUsers
        consumerApiEvent-in-0:
          destination: topicUsers

I'm sending from the first microservice like that:

@Autowired
private StreamBridge bridge;

public void send(PermissionEvent event){
    Message<PermissionEvent> message = MessageBuilder.withPayload(event)
            .setHeader("type","consumerPermissionEvent").build();
    bridge.send("output", message);
}

And the second microservice has two consumers:

    @Bean
    public Consumer<Message<ApiEvent>> consumerApiEvent() {
        return e -> log.debug("READED API EVENT: {}", e.getPayload());
    }

    @Bean
    public Consumer<Message<PermissionEvent>> consumerPermissionEvent() {
        return e -> log.debug("READED PERMISSION EVENT: {}", e.getPayload());
    }

And the output logs from the second microservice:

[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED API EVENT: ApiEvent(apiId=null)
[KafkaConsumerDestination{consumerDestinationName='topicUsers', partitions=1, dlqName='null'}.container-0-C-1] [20b662594644cf2e] DEBUG c.m.o.v.eda.subscribers.NotificationSuscriber - READED PERMISSION EVENT: PermissionEvent(userRole=roleUseradsf)

Any ideas how to do it?

Thanks in advance

Upvotes: 2

Views: 2863

Answers (1)

Ish Mahajan
Ish Mahajan

Reputation: 239

you will need to enable routing first by using following property:

--spring.cloud.stream.function.routing.enabled=true

for more details, refer to https://cloud.spring.io/spring-cloud-stream/reference/html/spring-cloud-stream.html#spring_cloud_function

Upvotes: 3

Related Questions