Reputation: 2912
I have the below Spring boot
code to receive values whenever a Redis stream is appended with new record. The problem is receiver never receives any message, also, the subscriber, when checked with subscriber.isActive()
, is always inactive. Whats wrong in this code? What did I miss? Doc for reference.
On spring boot start, initialize the necessary redis resources
Lettuce connection factory
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory("127.0.0.1", 6379);
}
RedisTemplate from the connection factory
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(connectionFactory);
return redisTemplate;
}
Rest controller to append data to redis stream
@PutMapping("/{name}")
public String post(@PathVariable String name) {
return redisTemplate.opsForStream().add(StreamRecords.newRecord().in("streamx").ofObject(name)).getValue();
}
JMS style imperative message listener
@Component
public class MyStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("message received: " + message.getValue());
}
}
Initialize the listener
@Bean
public Subscription listener(MyStreamListener streamListener, RedisConnectionFactory redisConnectionFactory) throws InterruptedException {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
.create(redisConnectionFactory);
Subscription subscription = container.receive(Consumer.from("my-group-1", "consumer-1"),
StreamOffset.create("streamx", ReadOffset.latest())), streamListener);
System.out.println(subscription.isActive()); // always false
return subscription;
}
Though, I am able to append to the stream through api.
Upvotes: 4
Views: 3231
Reputation: 2912
The important step is, start the StreamMessageListenerContainer
after the subscription is done.
container.start();
Upvotes: 5