Kent Boogaart
Kent Boogaart

Reputation: 178770

Managing state in a reactive pipeline

I am constructing a reactive pipeline that needs to expand (SelectMany) and then flatten (in this case, ToArray) whilst maintaining access to a piece of state obtained at the beginning of the pipeline.

Here is pseudo-code for what I am attempting:

return Observable
    .Start(() => this.GetSearchResults(query))
    .SelectMany(results => results.Hits)     // results.Hits is a list of IDs. But there is also has a bool property that I want to keep through to the end of my pipeline
    .SelectMany(hit => GetById(hit.Id))      // asynchronously load each result
    .ToArray()                               // now need to pull all the results together into a containing data structure, and also include the bool flag from above in it
    .Select(resolvedResults => new ...);     // need access to both resolvedResults and the bool mentioned in the first comment above

So I'm trying to find a way to cleanly access some state determined at the beginning of the pipeline from the code at the end of the pipeline.

The first thing I tried was using anonymous types to bundle the bool with each result. This quickly got out of hand and was wasteful from a performance perspective.

The second thing I tried was using a subject as follows:

var state = new AsyncSubject<bool>();
return Observable
    .Start(() => this.GetSearchResults(query))
    .Do(results =>
        {
            state.OnNext(results.Flag);
            state.OnCompleted();
        }
    .SelectMany(results => results.Hits)
    .SelectMany(hit => GetById(hit.Id))
    .ToArray()
    .Zip(
        state,
        (results, state) => new ResultContainer(state, results));

This seems to work fine, but feels a little icky to me.

So what I'm wondering is whether there is a cleaner way to manage state in a reactive pipeline.

For reference, here is the actual code (rather than just pseudo-code):

public IObservable<ISearchResults<IContact>> Search(string query, int maximumResultCount = 100, float minimumScore = 0.1F)
{
    Ensure.ArgumentNotNull(query, nameof(query));

    var moreHitsAvailable = new AsyncSubject<bool>();

    return Observable
        .Start(
            () => this.searchIndexService.Search<IContact>(query, maximumResultCount, minimumScore),
            this.schedulerService.DataStoreScheduler)
        .Do(
            results =>
            {
                moreHitsAvailable.OnNext(results.MoreHitsAreAvailable);
                moreHitsAvailable.OnCompleted();
            })
        .SelectMany(
            results => results
                .Hits
                .Select(
                    hit => new
                    {
                        Id = hit.Id,
                        ParsedId = ContactId.Parse(hit.Id)
                    }))
        .SelectMany(
            result => this
                .GetById(result.ParsedId)
                .Select(
                    contact => new
                    {
                        Id = result.Id,
                        Contact = contact
                    }))
        .Do(
            result =>
            {
                if (result.Contact == null)
                {
                    this.logger.Warn("Failed to find contact with ID '{0}' provided by the search index. Index may be out of date.", result.Id);
                }
            })
        .Select(result => result.Contact)
        .Where(contact => contact != null)
        .ToArray()
        .Zip(
            moreHitsAvailable,
            (results, more) => new SearchResults<IContact>(more, results.ToImmutableList()))
        .PublishLast()
        .ConnectUntilCompleted();
}

Upvotes: 3

Views: 170

Answers (1)

Lee Campbell
Lee Campbell

Reputation: 10783

You could pop out to Query Comprehension Syntax and do something like this

var x = from result in Observable.Start(() => this.GetSearchResults())
    let hasMore = result.MoreHitsAreAvailable
    from hit in result.Hits
    from contact in GetById(hit.Id)
    select new { hasMore , contact};

Over to you how to deal with the duplicate hasMore values. As we know it will be just the single distinct value (all true or all false) you could group by.

Upvotes: 2

Related Questions