Nik
Nik

Reputation: 23

Memory efficient GroupBy + Aggregation using Rx

I have a sequence of items, and want to group them by a key and calculate several aggregations for each key.

The number of items is large, but the number of distinct keys is small.

A toy example:

static List<(string Key, decimal Sum, int Count)> GroupStats(
    IEnumerable<(string Key, decimal Value)> items)
{
    return items
        .GroupBy(x => x.Key)
        .Select(g => (
            Key : g.Key,
            Sum : g.Sum(x => x.Value),
            Count : g.Count()
        ))
        .ToList();
}

Using Linq's GroupBy has the unfortunate consequence that it'll need to load all the items into memory.

An imperative implementation would only consume memory proportional to the number of distinct keys, but I'm wondering if there is a nicer solution.

Reactive Extension's "push" approach should theoretically enable low memory grouping as well, but I didn't find a way to escape from IObservable to materialize the actual values. I'm also open to other elegant solutions (besides the obvious imperative implementation).

Upvotes: 2

Views: 479

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43474

You could do this:

static IList<(string Key, decimal Sum, int Count)> GroupStats(
    IEnumerable<(string Key, decimal Value)> source)
{
    return source
        .ToObservable()
        .GroupBy(x => x.Key)
        .Select(g => (
            Key: g.Key,
            Sum: g.Sum(x => x.Value).PublishLast().AutoConnect(0),
            Count: g.Count().PublishLast().AutoConnect(0)
        ))
        .ToList()
        .Wait()
        .Select(e => (e.Key, e.Sum.Wait(), e.Count.Wait()))
        .ToArray();
}
  • With the ToObservable operator, the IEnumerable<T>¹ source is converted to an IObservable<T> sequence.

  • The GroupBy converts the IObservable<T> to an IObservable<IGroupedObservable<string, T>>.

  • The Select converts each IGroupedObservable<string, T> to a (string, IObservable<decimal>, IObservable<int>). The PublishLast is used in order to remember the last (and only) value emitted by the Sum and Count operators. The AutoConnect(0) subscribes to these subsequences immediately when they are emitted.

  • The ToList converts the IObservable<T> to an IObservable<IList<T>>. The outer observable will emit a single list when it is completed.

  • The Wait waits synchronously for the outer observable to complete, and to emit the single list. This is where all the work happens. Until this point the source sequence has not been enumerated. The Wait subscribes to the observable that has been constructed so far, which triggers subscriptions to the underlying observables, and eventually triggers the enumeration of the source. All the calculations are performed synchronously during the subscriptions, on the current thread. So the verb "wait" doesn't describe accurately what's happening here.

  • The next Select converts each (string, IObservable<decimal>, IObservable<int>) to a (string, decimal, int), by waiting the subsequences. These subsequences have already completed at this point, and their single output is stored inside the PublishLast. So these inner Wait invocations are not triggering any serious work. All the heavy work has already been done on the previous step.

  • Finally the ToArray converts the IEnumerable<(string, decimal, int)> to an array of (string, decimal, int), which is the output of the GroupStats method.

¹ I am using the T as placeholder for a complex ValueTuple, so that the explanation is not overly verbose.


Update: The Rx ToObservable operator has quite a lot of overhead, because it has to support the Rx scheduling infrastructure. You can replace it with the ToObservableHypersonic below, and achieve a speed-up of around 5x:

public static IObservable<TSource> ToObservableHypersonic<TSource>(
    this IEnumerable<TSource> source)
{
    return Observable.Create<TSource>(observer =>
    {
        foreach (var item in source) observer.OnNext(item);
        observer.OnCompleted();
        return Disposable.Empty;
    });
}

I should also mention an alternative to the PublishLast+AutoConnect(0) combination, which is to convert the subsequences to tasks with the ToTask method. It has the same effect: the subsequences are subscribed immediately and their last value is memorized.

    Sum: g.Sum(x => x.Value).ToTask(),
    Count: g.Count().ToTask()
//...
.Select(e => (e.Key, e.Sum.Result, e.Count.Result))

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117037

I wonder if this is a simpler implementation:

static IList<(string Key, decimal Sum, int Count)> GroupStats(
    IEnumerable<(string Key, decimal Value)> source)
{
    return source
        .ToObservable(Scheduler.Immediate)
        .GroupBy(x => x.Key)
        .SelectMany(
            g => g.Aggregate(
                (Sum: 0m, Count: 0),
                (a, x) => (a.Sum + x.Value, a.Count + 1)), 
            (x, y) => (Key: x.Key, Sum: y.Sum, Count: y.Count)) 
        .ToList()
        .Wait();
}

Or better, a non-blocking version:

static async Task<IList<(string Key, decimal Sum, int Count)>> GroupStats(
    IEnumerable<(string Key, decimal Value)> source)
{
    return await source
        .ToObservable(Scheduler.Immediate)
        .GroupBy(x => x.Key)
        .SelectMany(
            g => g.Aggregate(
                (Sum: 0m, Count: 0),
                (a, x) => (a.Sum + x.Value, a.Count + 1)), 
            (x, y) => (Key: x.Key, Sum: y.Sum, Count: y.Count)) 
        .ToList();
}

If I run the async version with this source:

var source = new[]
{
    (Key: "a", Value: 1m),
    (Key: "c", Value: 2m),
    (Key: "b", Value: 3m),
    (Key: "b", Value: 4m),
    (Key: "c", Value: 5m),
    (Key: "c", Value: 6m),
};

var output = await GroupStats(source);

I get this output:

output

Upvotes: 0

Related Questions