Reputation: 838
I have a Listener
like this:
public class Listener extends AbstractVerticle {
public static void main(String[] args) {
Launcher.executeCommand("run", Listener.class.getName());
}
@Override
public void start() {
RedisOptions config = new RedisOptions()
.setHost("127.0.0.1");
RedisClient redis = RedisClient.create(vertx, config);
redis.blpop("myKey", 3500, System.out::println);
}
}
And when a message is received, it is successfully printed on console, but no other message can be received anymore. And if I run the blpop
inside a while(true)
I get an Exception for blocking the thread.
How can I permanently listen for message with Vertx and Redis?
What I really need to archive is some kind of Round Robin with many "listeners" but only one getting the message at the time. I mean, one to many publishers to a specific channel, and one to many subscribers, but when a message us published just one subscriber gets the message.
Upvotes: 0
Views: 527
Reputation: 1
How can I permanently listen for message with Vertx and Redis?
Another option would be to use pub/sub channels via Redis.
You can find some good examples of this in the following links:
And here is a quick code example of this:
Redis
.createClient(vertx, "redis://127.0.0.1:7006")
.connect()
.onSuccess(conn -> {
conn.handler(message -> {
// insert you code for message handling in here
});
// now we subscribe
conn
.send(Request
.cmd(Command.PSUBSCRIBE)
.arg("new*"))
.onSuccess(ok -> {
// subscribed to all events matching pattren "new*"
});
}
Upvotes: 0
Reputation: 1
There are several ways to do what you ask for:
example for last point:
private void brpop(final String myAddress, final String destinationAddress,final String queue, final int brpopTimeout){
EventBus bus = vertx.eventBus();
redis.brpop(Arrays.asList(queue,""+brpopTimeout)).onComplete(c->{
if(c.succeeded()){
Response r = c.result();
System.out.println("result from read queue:"+r.get(1));
bus.send(myAddress,MSG_CONTINUE); // signal to read again
bus.send(destinationAddress,r.get(1)); // send the value read from queue on the event bus to be picked up by someone else
}else{
Throwable t = c.cause();
Response r = c.result();
System.out.println("error reading from queue:" + t + r);
bus.send(myAddress,MSG_CONTINUE);
}
});
}
public void start(Promise<Void> startPromise) throws Exception {
client = Redis.createClient(vertx,"redis://localhost:6379");
redis = RedisAPI.api(client);
client.connect().onSuccess(c->{
System.out.println("connected ok");
EventBus bus = vertx.eventBus();
// use local consumer to get events to self
MessageConsumer<String> consumer = bus.localConsumer("brpop-my", m->{
brpop(MY_ADDRESS,"read-from-ingress","sample",10);
});
// kick off the read loop from redis
bus.send(MY_ADDRESS,MSG_CONTINUE);
}).onFailure(f->{
System.out.println("failed to connect: " + f);
});
}
Upvotes: 0
Reputation: 17721
This could be divided into two questions:
1) How to listen continuously in VertX
2) How can we implement sending messages to only one verticle
As of first question, you can use setInterval:
vertx.setPeriodic(1000, t -> {
redis.blpop("myKey", 3500, h -> {
System.out.println(h.result()); // Need to check for success()
}
}));
This won't block the EventLoop
As of second question, use vertx.eventBus().send() to send a message to one of the consumers in a Round-Robin fashion: http://vertx.io/docs/vertx-core/java/#_the_event_bus_api
Upvotes: 1