Reputation: 11
I need to merge 2 arrays using UniRx in order to get Observable which emits first elements of arrays then second elements and so on, then emits the rest of the longest array
I tried Zip but Zip cuts the tail of longest array I tried Merge with Scheduler.DefaultSchedulers.Iteration but it starts a parallel threads which I don't want
var x1 = new[] {1, 2, 3}.ToObservable();
var x2 = new[] {4, 5, 6, 7, 8, 9}.ToObservable();
var merge = x1.Merge(x2);
merge.Subscribe(i => print(i));
I expected 1 4 2 5 3 6 7 8 9 I got 1 2 3 4 5 6 7 8 9
Upvotes: 1
Views: 564
Reputation: 117174
This works for me as you expect:
var a1 = new int[] { 1, 2, 3 };
var a2 = new int[] { 4, 5, 6, 7, 8, 9 };
var x1 = a1.Select(x => (int?)x).ToObservable().Concat(Observable.Repeat((int?)null));
var x2 = a2.Select(x => (int?)x).ToObservable().Concat(Observable.Repeat((int?)null));
var query =
x1
.Zip(x2, (i1, i2) => new [] { i1, i2 })
.TakeWhile(xs => !(xs[0] == null && xs[1] == null))
.SelectMany(xs => xs)
.Where(x => x != null)
.Select(x => x.Value);
Upvotes: 1
Reputation: 14370
The solution posted by @mm8 will work with two Observables with a defined end (like arrays masquerading as observables), but will hang forever with infinite observables, which are a common case. For example:
var odds = Observable.Interval(TimeSpan.FromMilliseconds(10))
.Select(i => i * 2 + 1);
var evens = Observable.Interval(TimeSpan.FromMilliseconds(10))
.Select(i => i * 2);
var zip = Observable.Zip(evens, odds);
int skip = await zip.Count().LastAsync();
// Hangs forever here.
If you're looking to interleave arrays, the best way to do it is to interleave as arrays. Rx is meant to work on Observables. This isn't an observable problem.
Upvotes: 0
Reputation: 169390
If you know the length of the arrays, you could concatentate the Zip
sequence with the other two and skip the number of already zipped elements. This is C# but should get you the idea:
var a1 = new int[] { 1, 2, 3 };
var a2 = new int[] { 4, 5, 6, 7, 8, 9 };
var x1 = a1.ToObservable();
var x2 = a2.ToObservable();
int skip = Math.Min(a1.Length, a2.Length);
Observable.Zip(x1, x2).SelectMany(x => x)
.Concat(x1.Skip(skip))
.Concat(x2.Skip(skip))
.Subscribe(i => Console.WriteLine(i));
Thank you, this code works for arrays. But how to implement this operation for generic
IObservable<int>
in which caseLength
is not accessible?
You can always count the number of elements in the zipped observable:
var x1 = new int[] { 1, 2, 3 }.ToObservable();
var x2 = new int[] { 4, 5, 6, 7, 8, 9 }.ToObservable();
var zip = Observable.Zip(x1, x2);
int skip = await zip.Count().LastAsync();
zip.SelectMany(x => x)
.Concat(x1.Skip(skip))
.Concat(x2.Skip(skip))
.Subscribe(i => Console.WriteLine(i));
Upvotes: 0