Mark Sowul
Mark Sowul

Reputation: 10600

Reactive - how to combine / join / look up items with two sequences

I am connecting to a web service that gives me all prices for a day (without time info). Each of those price results has the id for a corresponding "batch run".

The "batch run" has a date+time stamp, but I have to make a separate call to get all the batch info for the day.

Hence, to get the actual time of each result, I need to combine the two API calls.

I'm using Reactive for this, but I can't reliably combine the two sets of data. I thought that CombineLatest would do it, but it doesn't seem to work as I thought (based on http://reactivex.io/documentation/operators/combinelatest.html, http://introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#CombineLatest).

    [TestMethod]
    public async Task EvenMoreBasicCombineLatestTest()
    {
        int batchStart = 100, batchCount = 10;

        //create 10 results with batch ids [100, 109]
        //the test uses lists just to make debugging easier
        var resultsWithBatchIdList = Enumerable.Range(batchStart, batchCount)
            .Select(id => new { BatchRunId = id, ResultValue = id * 10 })
            .ToList();
        var resultsWithBatchId = Observable.ToObservable(resultsWithBatchIdList);
        Assert.AreEqual(batchCount, await resultsWithBatchId.Count());

        //create 10 batches with ids [100, 109]
        var batchesList = Enumerable.Range(batchStart, batchCount)
            .Select(id => new
            {
                ThisId = id,
                BatchName = String.Concat("abcd", id)
            })
            .ToList();
        var batchesObservable = Observable.ToObservable(batchesList);
        Assert.AreEqual(batchCount, await batchesObservable.Count());

        //turn the batch set into a dictionary so we can look up each batch by its id
        var batchRunsByIdObservable = batchesObservable.ToDictionary(batch => batch.ThisId);

        //for each result, look up the corresponding batch id in the dictionary to join them together
        var resultsWithCorrespondingBatch =
            batchRunsByIdObservable
            .CombineLatest(resultsWithBatchId, (batchRunsById, result) =>
            {
                Assert.AreEqual(NumberOfResultsToCreate, batchRunsById.Count);

                var correspondingBatch = batchRunsById[result.BatchRunId];

                var priceResultAndSourceBatch = new 
                {
                    Result = result,
                    SourceBatchRun = correspondingBatch
                };
                return priceResultAndSourceBatch;
            });
        Assert.AreEqual(batchCount, await resultsWithCorrespondingBatch.Count());
    }

I would expect as each element of the 'results' observable comes through, it would get combined with each element of the batch-id dictionary observable (which only ever has one element). But instead, it looks like only the last element of the result list gets joined.

I have a more complex problem deriving from this but while trying to create a minimum repro, even this is giving me unexpected results. This happens with version 3.1.1, 4.0.0, 4.2.0, etc.

(Note that the sequences don't generally match up as in this artificial example, so I can't just Zip them.)

So how can I do this join? A stream of results that I want to look up more info via a Dictionary (which also is coming from an Observable)?

Also note that the goal is to return the IObservable (resultsWithCorrespondingBatch), so I can't just await the batchRunsByIdObservable.

Upvotes: 0

Views: 168

Answers (1)

Mark Sowul
Mark Sowul

Reputation: 10600

Ok I think I figured it out. I wish either of the two marble diagrams in the documentation had been just slightly different -- it would have made a subtlety of CombineLatest much more obvious:

N------1---2---3---
L--z--a------bc----

R------1---2-223---
       a   a bcc 

It's combine latest -- so depending on when items get emitted, it's possible to miss some tuples. What I should have done is SelectMany:

NO: .CombineLatest(resultsWithBatchId, (batchRunsById, result) =>
YES: .SelectMany(batchRunsById => resultsWithBatchId.Select(result =>

Note that the "join" order is important: A.SelectMany(B) vs B.SelectMany(A) -- if A has 1 item and B has 100 items, the latter would result in 100 calls to subscribe to A.

Upvotes: 1

Related Questions