Reputation: 133
I'm new to RxJava and trying to implement an Observable of messages from a RabbitMQ queue that supports lossless backpressure. I've managed to create an Observable from a Spring AMQP MessageListener. This handles backpressure fine in a synchronous environment (e.g. callstack blocking), but as soon as multiple threads are introduced, backpressure goes out of the window - as you would expect. The class is below:
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
import rx.Observable;
import rx.subscriptions.Subscriptions;
import javax.inject.Inject;
@Component
public class CommandExchange {
private final MessageConverter messageConverter;
private final ConnectionFactory connectionFactory;
@Inject
public CommandExchange(MessageConverter messageConverter, ConnectionFactory connectionFactory) {
this.messageConverter = messageConverter;
this.connectionFactory = connectionFactory;
}
public <T extends Command> Observable<T> observeQueue(String... queueNames) {
return Observable.create(subscriber -> {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueNames);
container.setMessageListener((MessageListener) message -> {
T command = (T) messageConverter.fromMessage(message);
if (!subscriber.isUnsubscribed()) {
System.out.println("Being asked for a message.");
subscriber.onNext(command);
}
});
container.start();
Subscriptions.create(container::shutdown);
});
}
}
I can't get my head around how to implement lossless backpressue here without blocking or buffering. It doesn't make sense to use buffering as the Rabbit MQ queue is already a buffer - so a message should only be consumed from the queue when a subscriber is ready for it. Is the solution rather to use a pull-based observable (i.e. stop using a listener and instead grab a message when there is demand from the subscriber)? If so, what would be the best practice for handling the case where there are no messages currently on the queue?
Upvotes: 3
Views: 2500
Reputation: 12087
Yep I would stop using a listener and grab messages from the queue on demand. Request accounting and backpressure is all handled for you then if you use
Observable.create(new SyncOnSubscribe<T>() {...});
In SyncOnSubscribe
you more or less just specify the action that is taken to get one message (or none if there is none waiting).
Upvotes: 2