Reputation: 2335
I have infinite stream of objects. And my requirement is that every item from the observable stream with the same key should be processed synchronously, and all the other items with different keys might/should process in parallel. The easiest way to do it (as mentioned in most places) is by using GroupByUntil
operator:
var results = observableStream
.GroupByUntil(item => item.Id, group =>
group.Throttle(TimeSpan.FromSeconds(30), scheduler))
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Select(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
The code works well until I can guarantee that execution of ProcessItem(item)
takes less than 30 seconds. Otherwise group.Throttle(TimeSpan.FromSeconds(30), scheduler)
will close the group's stream and there's a very high probability that new item arrives and starts processing on a new thread.
So basically I need to somehow know that my thread has completed processing all the items with specific key and I need to inform within durationSelector
of GroupByUntil
operator parameter about it.
Any ideas on how to achieve this? Thanks in advance.
Upvotes: 3
Views: 1218
Reputation: 43545
It seems that you need a variant of the RxJS exhaustMap
operator:
Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed.
An Rx implementation of this operator (ExhaustMap
) can be found here. In your case you just need to apply the same logic for each grouped subsequence of the observable sequence:
/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence that has the same key has completed.</summary>
public static IObservable<TResult> ExhaustMapPerKey<TSource, TKey, TResult>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
Func<TSource, TKey, IObservable<TResult>> function,
IEqualityComparer<TKey> keyComparer = default)
{
// Arguments validation omitted
keyComparer ??= EqualityComparer<TKey>.Default;
return source
.GroupBy(keySelector, keyComparer)
.SelectMany(group =>
{
int localMutex = 0; // 0: not acquired, 1: acquired
return group.SelectMany(item =>
{
if (Interlocked.CompareExchange(ref localMutex, 1, 0) == 0)
return function(item, group.Key)
.Finally(() => Volatile.Write(ref localMutex, 0));
return Observable.Empty<TResult>();
});
});
}
Usage example:
var results = observableStream
.ExhaustMapPerKey(item => item.Id, (item, key) =>
Observable.Start(() => ProcessItem(item), scheduler));
Upvotes: 0
Reputation: 14350
This is very similar to this question: A way to push buffered events in even intervals.
Form the answer to that question, there's an operator Drain
:
public static class ObservableDrainExtensions
{
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { }, () => queue.OnNext(new Unit()))
);
});
}
}
Given that operator, your problem becomes very simple:
var results = observableStream
.GroupBy(item => item.Id)
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Drain(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
Given a stream that looks like A1, A2, B1, A3, B2, C1, B3, C2, GroupBy
separates the streams by IDs:
A: A1, A2, A3
B: B1, B2, B3
C: C1, C2
...and Drain
makes sure that for the items in a given sub-stream, they run serially, not in parallel.
Upvotes: 4