Angus Goldsmith
Angus Goldsmith

Reputation: 133

How to implement backpressure in an RxJava RabbitMQ Observable?

I'm new to RxJava and trying to implement an Observable of messages from a RabbitMQ queue that supports lossless backpressure. I've managed to create an Observable from a Spring AMQP MessageListener. This handles backpressure fine in a synchronous environment (e.g. callstack blocking), but as soon as multiple threads are introduced, backpressure goes out of the window - as you would expect. The class is below:

import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;
import rx.Observable;
import rx.subscriptions.Subscriptions;

import javax.inject.Inject;

@Component
public class CommandExchange {
    private final MessageConverter messageConverter;
    private final ConnectionFactory connectionFactory;

    @Inject
    public CommandExchange(MessageConverter messageConverter, ConnectionFactory connectionFactory) {
        this.messageConverter = messageConverter;
        this.connectionFactory = connectionFactory;
    }

    public <T extends Command> Observable<T> observeQueue(String... queueNames) {
        return Observable.create(subscriber -> {

            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueueNames(queueNames);
            container.setMessageListener((MessageListener) message -> {
                T command = (T) messageConverter.fromMessage(message);
                if (!subscriber.isUnsubscribed()) {
                    System.out.println("Being asked for a message.");
                    subscriber.onNext(command);
                }
            });
            container.start();

            Subscriptions.create(container::shutdown);

        });
    }
}

I can't get my head around how to implement lossless backpressue here without blocking or buffering. It doesn't make sense to use buffering as the Rabbit MQ queue is already a buffer - so a message should only be consumed from the queue when a subscriber is ready for it. Is the solution rather to use a pull-based observable (i.e. stop using a listener and instead grab a message when there is demand from the subscriber)? If so, what would be the best practice for handling the case where there are no messages currently on the queue?

Upvotes: 3

Views: 2500

Answers (1)

Dave Moten
Dave Moten

Reputation: 12087

Yep I would stop using a listener and grab messages from the queue on demand. Request accounting and backpressure is all handled for you then if you use

Observable.create(new SyncOnSubscribe<T>() {...});

In SyncOnSubscribe you more or less just specify the action that is taken to get one message (or none if there is none waiting).

Upvotes: 2

Related Questions