Ilya Bokovenko
Ilya Bokovenko

Reputation: 11

How to merge to arrays using Rx

I need to merge 2 arrays using UniRx in order to get Observable which emits first elements of arrays then second elements and so on, then emits the rest of the longest array

I tried Zip but Zip cuts the tail of longest array I tried Merge with Scheduler.DefaultSchedulers.Iteration but it starts a parallel threads which I don't want

var x1 = new[] {1, 2, 3}.ToObservable();
var x2 = new[] {4, 5, 6, 7, 8, 9}.ToObservable();
var merge = x1.Merge(x2);
merge.Subscribe(i => print(i));

I expected 1 4 2 5 3 6 7 8 9 I got 1 2 3 4 5 6 7 8 9

Upvotes: 1

Views: 564

Answers (3)

Enigmativity
Enigmativity

Reputation: 117174

This works for me as you expect:

var a1 = new int[] { 1, 2, 3 };
var a2 = new int[] { 4, 5, 6, 7, 8, 9 };

var x1 = a1.Select(x => (int?)x).ToObservable().Concat(Observable.Repeat((int?)null));
var x2 = a2.Select(x => (int?)x).ToObservable().Concat(Observable.Repeat((int?)null));

var query =
    x1
        .Zip(x2, (i1, i2) => new [] { i1, i2 })
        .TakeWhile(xs => !(xs[0] == null && xs[1] == null))
        .SelectMany(xs => xs)
        .Where(x => x != null)
        .Select(x => x.Value);

query

Upvotes: 1

Shlomo
Shlomo

Reputation: 14370

The solution posted by @mm8 will work with two Observables with a defined end (like arrays masquerading as observables), but will hang forever with infinite observables, which are a common case. For example:

var odds = Observable.Interval(TimeSpan.FromMilliseconds(10))
    .Select(i => i * 2 + 1);
var evens = Observable.Interval(TimeSpan.FromMilliseconds(10))
    .Select(i => i * 2);

var zip = Observable.Zip(evens, odds);
int skip = await zip.Count().LastAsync();

// Hangs forever here.

If you're looking to interleave arrays, the best way to do it is to interleave as arrays. Rx is meant to work on Observables. This isn't an observable problem.

Upvotes: 0

mm8
mm8

Reputation: 169390

If you know the length of the arrays, you could concatentate the Zip sequence with the other two and skip the number of already zipped elements. This is C# but should get you the idea:

var a1 = new int[] { 1, 2, 3 };
var a2 = new int[] { 4, 5, 6, 7, 8, 9 };

var x1 = a1.ToObservable();
var x2 = a2.ToObservable();

int skip = Math.Min(a1.Length, a2.Length);
Observable.Zip(x1, x2).SelectMany(x => x)
    .Concat(x1.Skip(skip))
    .Concat(x2.Skip(skip))
    .Subscribe(i => Console.WriteLine(i));

Thank you, this code works for arrays. But how to implement this operation for generic IObservable<int> in which case Length is not accessible?

You can always count the number of elements in the zipped observable:

var x1 = new int[] { 1, 2, 3 }.ToObservable();
var x2 = new int[] { 4, 5, 6, 7, 8, 9 }.ToObservable();

var zip = Observable.Zip(x1, x2);
int skip = await zip.Count().LastAsync();

zip.SelectMany(x => x)
    .Concat(x1.Skip(skip))
    .Concat(x2.Skip(skip))
    .Subscribe(i => Console.WriteLine(i));

Upvotes: 0

Related Questions