vvra
vvra

Reputation: 2912

Receive Redis streams data using Spring & Lettuce

I have the below Spring boot code to receive values whenever a Redis stream is appended with new record. The problem is receiver never receives any message, also, the subscriber, when checked with subscriber.isActive(), is always inactive. Whats wrong in this code? What did I miss? Doc for reference.

On spring boot start, initialize the necessary redis resources

Lettuce connection factory

@Bean
public RedisConnectionFactory redisConnectionFactory() {
    return new LettuceConnectionFactory("127.0.0.1", 6379);
}

RedisTemplate from the connection factory

@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
    RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
    redisTemplate.setConnectionFactory(connectionFactory);
    return redisTemplate;
}

Rest controller to append data to redis stream

@PutMapping("/{name}")
public String post(@PathVariable String name) {
    return redisTemplate.opsForStream().add(StreamRecords.newRecord().in("streamx").ofObject(name)).getValue();
}

JMS style imperative message listener

@Component
public class MyStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

@Override
public void onMessage(MapRecord<String, String, String> message) {
    System.out.println("message received: " + message.getValue());
}

}

Initialize the listener

  @Bean
public Subscription listener(MyStreamListener streamListener, RedisConnectionFactory redisConnectionFactory) throws InterruptedException {
    StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
            .create(redisConnectionFactory);
    Subscription subscription = container.receive(Consumer.from("my-group-1", "consumer-1"),
            StreamOffset.create("streamx", ReadOffset.latest())), streamListener);
    System.out.println(subscription.isActive()); // always false
    return subscription;
}

Though, I am able to append to the stream through api.

Upvotes: 4

Views: 3231

Answers (1)

vvra
vvra

Reputation: 2912

The important step is, start the StreamMessageListenerContainer after the subscription is done.

container.start();

Upvotes: 5

Related Questions