João Bragança
João Bragança

Reputation: 1383

Observable.Zip when number of sequences to zip is unknown until runtime

I need to model an approval process. Before it was pretty simple. Two roles had to approve something, and then we could go on to the next step:

public class Approved
{
    public string ApproverRole;
}

var approvals = Subscribe<Approved>();

var vpOfFinance = approvals.Where(e => e.ApproverRole == "Finance VP");
var vpOfSales = approvals.Where(e => e.ApproverRole == "Sales VP");

var approvedByAll = vpOfFinance.Zip(vpOfSales, Tuple.Create);

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

But now there is a new requirement: the number of roles required to approve something can vary:

public class ApprovalRequested
{
    public string[] Roles;
}
var approvalRequest = Subscribe<ApprovalRequested>().Take(1);
var approvals = Subscribe<Approved>();

var approvedByAll = ???;

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

I feel like I am missing something pretty obvious here... can anyone point me in the right direction?

Edit

To clarify: The approval process is on a per item basis. The order that the approvals can arrive in is undefined. We don't care if one role approves an item multiple times.

Upvotes: 4

Views: 1343

Answers (2)

Asti
Asti

Reputation: 12667

The problem can essentially be reduced to creating a Set from a stream of values where values may be out of order or many in nature.

If N is the cardinality of the set, we can trivially assume that the process will not proceed until at least N types of values (roles in this case) have been pushed.

Here's a sample solution of the Zip operator; perhaps this can get you started:

    public static IObservable<IList<T>> Zip<T>(this IList<IObservable<T>> observables)
    {
        return Observable.Create<IList<T>>(observer =>
        {
            List<List<T>> store = new List<List<T>>(Enumerable.Range(1, observables.Count).Select(_ => new List<T>()));

            return new CompositeDisposable(observables.Select((o, i) => 
                o.Subscribe(value =>
                {
                    lock (store)
                    {
                        store[i].Add(value);

                        if (store.All(list => list.Count > 0))
                        {
                            observer.OnNext(store.Select(list => list[0]).ToList());
                            store.ForEach(list => list.RemoveAt(0));
                        }
                    }
                }))
            );
        });
    }

Test:

        Observable.Interval(TimeSpan.FromSeconds(0.5))
                  .GroupBy(i => i % 3)
                  .Select(gr => gr.AsObservable())
                  .Buffer(3)                      
                  .SelectMany(set => set.Zip())
                  .Subscribe(v => Console.WriteLine(String.Join(",", v)));

One issue here is that you may lose out on the initial values while the groups are being formed, so you might want to incorporate that by rewriting the method as IObservable<IList<T>> Zip<TKey, T>(this IGroupedObservable<TKey, T> observables).

Upvotes: 2

svick
svick

Reputation: 244767

In the current version of Rx (which I got from NuGet), there is a version of Zip() that takes a collection of observables and returns an observable of collections. With that, you can do something like this:

string[] requiredApprovals = …;

var approvedByAll = requiredApprovals
    .Select(required => approvals.Where(a => a.ApproverRole == required))
    .Zip();

approvedByAll.Subscribe(_ => SomeInterestingBusinessProcess());

But as @Enigmativity pointed out, this will work only if you can be sure that each person approves in the same order and that all items will eventually be approved by all required roles. If not, you will need something more complicated than just a Zip().

Upvotes: 2

Related Questions