Chris Tavares
Chris Tavares

Reputation: 30411

Using Observables to process queue messages which require a callback at end of processing?

This is a bit of a conceptual question, so let me know if it's off topic.

I'm looking at writing yet another library to process messages off a queue - in this case an Azure storage queue. It's pretty easy to create an observable and throw a message into it every time a message is available.

However, there's a snag here that I'm not sure how to handle. The issue is this: when you're done processing the message, you need to call an API on the storage queue to actually delete the message. Otherwise the visibility timeout will expire and the message will reappear to be dequeued again.

As an example, here's how this loop looks in C#:

    public event EventHandler<string> OnMessage;

    public void Run()
    {
        while(true)
        {
            // Read message
            var message = queue.GetMessage();

            if (message != null)
            {
                // Run any handlers
                OnMessage?.Invoke(this, message.AsString);

                // Delete off queue when done
                queue.DeleteMessage(message);
            }
            else
            {
                Thread.Sleep(2500);
            }
        }
    }

The important thing here is that we read the message, trigger any registered event handlers to do things, then delete the message after the handlers are done. I've omitted error handling here, but in general if the handler fails we should NOT delete the message, but instead let it return to visibility automatically and get redelivered later.

How do you handle this kind of thing using Rx? Ideally I'd like to expose the observable for anyone to subscribe to. But I need to do stuff at the end of processing for that message, whatever the "end" happens to mean here.

I can think of a couple of possible solutions, but I don't really like any of them. One would be to have the library call a function supplied by the consumer, that takes in the source observable, hooks up whatever it wants, then returns a new observable that the library can then subscribe on to do the final cleanup. But that's pretty limiting, as consumers basically only have one shot to hook up to the messages, which seems pretty limiting.

I guess I could put the call to delete the message after the call to onNext, but then I don't know if the processing succeeded or failed unless there's some sort of back channel in that api I don't know about?

Any ideas/suggestions/previous experience here?

Upvotes: 2

Views: 1107

Answers (1)

Enigmativity
Enigmativity

Reputation: 117174

Try having a play with this:

IObservable<int> source =
    Observable
        .Range(0, 3)
        .Select(x =>
            Observable
                .Using(
                    () => Disposable.Create(() => Console.WriteLine($"Removing {x}")),
                    d => Observable.Return(x)))
        .Merge();

source
    .Subscribe(x => Console.WriteLine($"Processing {x}"));

It produces:

Processing 0
Removing 0
Processing 1
Removing 1
Processing 2
Removing 2

Upvotes: 0

Related Questions