Azat
Azat

Reputation: 2335

Rx.NET GroupByUntil group termination, wait for thread completion

I have infinite stream of objects. And my requirement is that every item from the observable stream with the same key should be processed synchronously, and all the other items with different keys might/should process in parallel. The easiest way to do it (as mentioned in most places) is by using GroupByUntil operator:

var results = observableStream
    .GroupByUntil(item => item.Id, group =>
        group.Throttle(TimeSpan.FromSeconds(30), scheduler))
    .SelectMany(group =>
        group
            .ObserveOn(scheduler)
            .Select(item => ProcessItem(item)));

var disposable = results.Subscribe(result => SaveResults(result));

The code works well until I can guarantee that execution of ProcessItem(item) takes less than 30 seconds. Otherwise group.Throttle(TimeSpan.FromSeconds(30), scheduler) will close the group's stream and there's a very high probability that new item arrives and starts processing on a new thread.

So basically I need to somehow know that my thread has completed processing all the items with specific key and I need to inform within durationSelector of GroupByUntil operator parameter about it.

Any ideas on how to achieve this? Thanks in advance.

Upvotes: 3

Views: 1218

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43545

It seems that you need a variant of the RxJS exhaustMap operator:

Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.

An Rx implementation of this operator (ExhaustMap) can be found here. In your case you just need to apply the same logic for each grouped subsequence of the observable sequence:

/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence that has the same key has completed.</summary>
public static IObservable<TResult> ExhaustMapPerKey<TSource, TKey, TResult>(
    this IObservable<TSource> source,
    Func<TSource, TKey> keySelector,
    Func<TSource, TKey, IObservable<TResult>> function,
    IEqualityComparer<TKey> keyComparer = default)
{
    // Arguments validation omitted
    keyComparer ??= EqualityComparer<TKey>.Default;
    return source
        .GroupBy(keySelector, keyComparer)
        .SelectMany(group =>
        {
            int localMutex = 0; // 0: not acquired, 1: acquired
            return group.SelectMany(item =>
            {
                if (Interlocked.CompareExchange(ref localMutex, 1, 0) == 0)
                    return function(item, group.Key)
                        .Finally(() => Volatile.Write(ref localMutex, 0));
                return Observable.Empty<TResult>();
            });
        });
}

Usage example:

var results = observableStream
    .ExhaustMapPerKey(item => item.Id, (item, key) =>
        Observable.Start(() => ProcessItem(item), scheduler));

Upvotes: 0

Shlomo
Shlomo

Reputation: 14350

This is very similar to this question: A way to push buffered events in even intervals.

Form the answer to that question, there's an operator Drain:

public static class ObservableDrainExtensions
{
    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    {
        return Observable.Defer(() =>
        {
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ => { }, () => queue.OnNext(new Unit()))
                );
        });
    }
}

Given that operator, your problem becomes very simple:

var results = observableStream
    .GroupBy(item => item.Id)
    .SelectMany(group =>
        group
            .ObserveOn(scheduler)
            .Drain(item => ProcessItem(item)));

var disposable = results.Subscribe(result => SaveResults(result));

Given a stream that looks like A1, A2, B1, A3, B2, C1, B3, C2, GroupBy separates the streams by IDs:

A: A1, A2, A3
B: B1, B2, B3
C: C1, C2

...and Drain makes sure that for the items in a given sub-stream, they run serially, not in parallel.

Upvotes: 4

Related Questions