KP Kurian
KP Kurian

Reputation: 41

Stop Spring Cloud Stream @StreamListener from listening until receipt of some Spring Event

I am working on a Camunda BPM Spring Boot Application. The application reads messages from rabbitmq queue using Spring Cloud Stream. Once the message is received, the application invokes a process instance in Camunda.

If there are messages already in the rabbitmq queue during application startup, the cloud stream listener starts reading messages even before Camunda is initialized.

Is it possible to stop the cloud stream listener from listening to the queue till some event is fired - in this case PostDeployEvent.

I have created a sample application for reference https://github.com/kpkurian/spring-cloud-stream-camunda

Thanks!!

Upvotes: 2

Views: 4030

Answers (2)

Mounir Messaoudi
Mounir Messaoudi

Reputation: 373

Spring cloud stream (kafka binder) added method for paussing and resuming consumer

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        System.out.println(in);
        consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
    }

    @Bean
    public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
        return event -> {
            System.out.println(event);
            if (event.getConsumer().paused().size() > 0) {
                event.getConsumer().resume(event.getConsumer().paused());
            }
        };
    }
}

please check the docs https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples

but i think there is some issue withe pause method: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479

PS/ you can get partion id and topic name in a sample listener :

  @StreamListener(Sink.INPUT)
  public void in(String in,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
    System.out.println(in);
    TopicPartition p = new TopicPartition(topic, partition);
    consumer.pause(Collections.singleton(p));
  }

or in errorChannel global Listener

   @StreamListener("errorChannel")
  public void errorGlobal(Message<?> message) {
    Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
    Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
    int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
    String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
    TopicPartition p = new TopicPartition(topic, partition);
    // ?
    consumer.pause(Collections.singleton(p));
}

Upvotes: 0

KP Kurian
KP Kurian

Reputation: 41

As suggested by @OlegZhurakousky

Issue

RuntimeService is autowired, and by the time application is started it the assumption is that all services, beans etc are fully initialized. If it is still going through the process of initialization and startup then it is not properly implemented from Spring idioms perspective.

Solution

wrap RuntimeService with a custom Lifecycle implementation which would not return until its start() method is executed ensuring that RuntmeService is ready to go.

I have implemented this in the sample github application

Upvotes: 2

Related Questions