Josemy
Josemy

Reputation: 838

How to keep listening for messages with Vertx and Redis?

I have a Listener like this:

public class Listener extends AbstractVerticle {
    public static void main(String[] args) {
        Launcher.executeCommand("run", Listener.class.getName());
    }

    @Override
    public void start() {
        RedisOptions config = new RedisOptions()
                .setHost("127.0.0.1");

        RedisClient redis = RedisClient.create(vertx, config);

        redis.blpop("myKey", 3500, System.out::println);

    }

}

And when a message is received, it is successfully printed on console, but no other message can be received anymore. And if I run the blpop inside a while(true) I get an Exception for blocking the thread.

How can I permanently listen for message with Vertx and Redis?

What I really need to archive is some kind of Round Robin with many "listeners" but only one getting the message at the time. I mean, one to many publishers to a specific channel, and one to many subscribers, but when a message us published just one subscriber gets the message.

Upvotes: 0

Views: 527

Answers (3)

user2308926
user2308926

Reputation: 1

How can I permanently listen for message with Vertx and Redis?

Another option would be to use pub/sub channels via Redis.

You can find some good examples of this in the following links:

And here is a quick code example of this:

Redis
 .createClient(vertx, "redis://127.0.0.1:7006")
 .connect()
 .onSuccess(conn -> {
   conn.handler(message -> {
   // insert you code for message handling in here
                               });

   // now we subscribe
   conn
   .send(Request
   .cmd(Command.PSUBSCRIBE)
   .arg("new*"))
   .onSuccess(ok -> {
    // subscribed to all events matching pattren "new*"
    });
}

Upvotes: 0

florin oltean
florin oltean

Reputation: 1

There are several ways to do what you ask for:

  1. with a timer, as described already; but this has a drawback that limits the throughput to minimum timer delay (i.e. 1ms)
  2. simply use an external thread not managed by Vert.x and do a brpop in an infinite loop while sending results via EventBus
  3. Look into the code for Vertx::setTimer and build something similar but without the added delay (i.e. you will end up submitting tasks to NIO event loop)
  4. Use a Vertical that is using event bus to trigger itself; it sends to itself events to do brpop; these events addressed to self are sent out after brpop returns.

example for last point:

private void brpop(final String myAddress, final String destinationAddress,final String queue, final int brpopTimeout){
    EventBus bus = vertx.eventBus();
    redis.brpop(Arrays.asList(queue,""+brpopTimeout)).onComplete(c->{
      if(c.succeeded()){
        Response r = c.result();
        System.out.println("result from read queue:"+r.get(1));
        bus.send(myAddress,MSG_CONTINUE); // signal to read again
        bus.send(destinationAddress,r.get(1)); // send the value read from queue on the event bus to be picked up by someone else
      }else{
        Throwable t = c.cause();
        Response r = c.result();
        System.out.println("error reading from queue:" + t + r);
        bus.send(myAddress,MSG_CONTINUE);
      }
    });
}
public void start(Promise<Void> startPromise) throws Exception {
    client = Redis.createClient(vertx,"redis://localhost:6379");
    redis = RedisAPI.api(client);
    client.connect().onSuccess(c->{
      System.out.println("connected ok");
      EventBus bus = vertx.eventBus();
      // use local consumer to get events to self
      MessageConsumer<String> consumer = bus.localConsumer("brpop-my", m->{
        brpop(MY_ADDRESS,"read-from-ingress","sample",10);
      });
      // kick off the read loop from redis
      bus.send(MY_ADDRESS,MSG_CONTINUE);
    }).onFailure(f->{
      System.out.println("failed to connect: " + f);
    });
  }

Upvotes: 0

Alexey Soshin
Alexey Soshin

Reputation: 17721

This could be divided into two questions:
1) How to listen continuously in VertX
2) How can we implement sending messages to only one verticle

As of first question, you can use setInterval:

vertx.setPeriodic(1000, t -> {
   redis.blpop("myKey", 3500, h -> { 
         System.out.println(h.result()); // Need to check for success() 
   }
}));

This won't block the EventLoop

As of second question, use vertx.eventBus().send() to send a message to one of the consumers in a Round-Robin fashion: http://vertx.io/docs/vertx-core/java/#_the_event_bus_api

Upvotes: 1

Related Questions