Reputation: 178770
I am constructing a reactive pipeline that needs to expand (SelectMany
) and then flatten (in this case, ToArray
) whilst maintaining access to a piece of state obtained at the beginning of the pipeline.
Here is pseudo-code for what I am attempting:
return Observable
.Start(() => this.GetSearchResults(query))
.SelectMany(results => results.Hits) // results.Hits is a list of IDs. But there is also has a bool property that I want to keep through to the end of my pipeline
.SelectMany(hit => GetById(hit.Id)) // asynchronously load each result
.ToArray() // now need to pull all the results together into a containing data structure, and also include the bool flag from above in it
.Select(resolvedResults => new ...); // need access to both resolvedResults and the bool mentioned in the first comment above
So I'm trying to find a way to cleanly access some state determined at the beginning of the pipeline from the code at the end of the pipeline.
The first thing I tried was using anonymous types to bundle the bool
with each result. This quickly got out of hand and was wasteful from a performance perspective.
The second thing I tried was using a subject as follows:
var state = new AsyncSubject<bool>();
return Observable
.Start(() => this.GetSearchResults(query))
.Do(results =>
{
state.OnNext(results.Flag);
state.OnCompleted();
}
.SelectMany(results => results.Hits)
.SelectMany(hit => GetById(hit.Id))
.ToArray()
.Zip(
state,
(results, state) => new ResultContainer(state, results));
This seems to work fine, but feels a little icky to me.
So what I'm wondering is whether there is a cleaner way to manage state in a reactive pipeline.
For reference, here is the actual code (rather than just pseudo-code):
public IObservable<ISearchResults<IContact>> Search(string query, int maximumResultCount = 100, float minimumScore = 0.1F)
{
Ensure.ArgumentNotNull(query, nameof(query));
var moreHitsAvailable = new AsyncSubject<bool>();
return Observable
.Start(
() => this.searchIndexService.Search<IContact>(query, maximumResultCount, minimumScore),
this.schedulerService.DataStoreScheduler)
.Do(
results =>
{
moreHitsAvailable.OnNext(results.MoreHitsAreAvailable);
moreHitsAvailable.OnCompleted();
})
.SelectMany(
results => results
.Hits
.Select(
hit => new
{
Id = hit.Id,
ParsedId = ContactId.Parse(hit.Id)
}))
.SelectMany(
result => this
.GetById(result.ParsedId)
.Select(
contact => new
{
Id = result.Id,
Contact = contact
}))
.Do(
result =>
{
if (result.Contact == null)
{
this.logger.Warn("Failed to find contact with ID '{0}' provided by the search index. Index may be out of date.", result.Id);
}
})
.Select(result => result.Contact)
.Where(contact => contact != null)
.ToArray()
.Zip(
moreHitsAvailable,
(results, more) => new SearchResults<IContact>(more, results.ToImmutableList()))
.PublishLast()
.ConnectUntilCompleted();
}
Upvotes: 3
Views: 170
Reputation: 10783
You could pop out to Query Comprehension Syntax and do something like this
var x = from result in Observable.Start(() => this.GetSearchResults())
let hasMore = result.MoreHitsAreAvailable
from hit in result.Hits
from contact in GetById(hit.Id)
select new { hasMore , contact};
Over to you how to deal with the duplicate hasMore
values. As we know it will be just the single distinct value (all true
or all false
) you could group by.
Upvotes: 2