Reputation: 20934
Still figuring out a proper use of different Rx* operators and stumbled upon the following problem:
I have a collection of models of the following type:
class Model {
final long timestamp;
final Object data;
public Model(long timestamp, Object data) {
this.timestamp = timestamp;
this.data = data;
}
}
This collection is sorted in ascending order (sorted by timestamp).
My goal - is to group them by "sequences". "Sequence" - is sequence of elements where each element is really close to its neighbor:
----A-B-C-----D-E-F---H-I--->
In this case I have 3 "sequences". Position on the axis is defined by Model's timestamp
property (not the emission time). Max distance to form a sequence should be configurable.
Or let's take more real example:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
In this case for max distance 10ms, I would get 3 sequences - (0,5,10) , (100,108,111,115) , (200,201,202)
This logic is really similar to debounce
operator. But instead of debouncing by real time, I need to debounce by some custom property.
This is how I would do that if timestamp would represent the emission time:
List<Model> models = new ArrayList<Model>(10) {{
add(new Model(0, null));
add(new Model(5, null));
add(new Model(10, null));
add(new Model(100, null));
add(new Model(108, null));
add(new Model(111, null));
add(new Model(115, null));
add(new Model(200, null));
add(new Model(201, null));
add(new Model(202, null));
}};
Observable<Model> modelsObservable = Observable.from(models).share();
modelsObservable.buffer(modelsObservable.debounce(10, TimeUnit.MILLISECONDS))
.subscribe(group -> {
//this is one of my groups
});
It is not necessarily needs to be a debounce - I was also looking at groupBy
operator, but I couldn't figure out the proper grouping criteria..
Upvotes: 2
Views: 500
Reputation: 2962
I wouldn't fiddle with the schedulers, but leverage Buffer/Window (depending if you need downstream observables or collections) and Scan.
In Rx.Net you can achieve it with:
var models = new[] { 0, 5, 10, 100, 108, 111, 115, 200, 201, 202 }
.ToObservable();
var enrichedModels = models.Scan(
new { Current = -1, Prev = -1 },
(acc, cur) => new { Current = cur, Prev = acc.Current })
.Skip(1).Publish();
enrichedModels.Buffer(() => enrichedModels.SkipWhile(em => em.Current < em.Prev + 10))
.Select(seq => seq.Select(em => em.Prev))
.Subscribe(seq =>
{
Console.WriteLine(String.Join(",", seq));
});
enrichedModels.Connect();
Results in:
0,5,10
100,108,111,115
200,201
Publish/Connect can probably be skipped if your source observable is hot. rx-java possesses the same operators, but not the anonymous types, I guess they can be replaced either by tuple or a concrete class.
Upvotes: 2
Reputation: 69997
A bit unconventional, but you could use the TestScheduler
here, schedule the value emission by the data value, then use the debounce-trick with this scheduler and move the virtual time ahead.
TestScheduler s = new TestScheduler();
Scheduler.Worker w = s.createWorker();
PublishSubject<Object> subject = PublishSubject.create();
for (Model m : model) {
w.schedule(() -> subject.onNext(m.data),
m.timestamp, TimeUnit.MILLISECONDS);
}
subject.buffer(subject.debounce(10, TimeUnit.MILLISECONDS, s))
.subscribe(list -> ...);
s.advanceTimeBy(Long.MAX_VALUE / 2, TimeUnit.MILLISECONDS);
w.unsubscribe();
(There were attempts at implementing a Virtual-time scheduler in RxJava but the discussion was abandoned and the proposed implementation rejected.)
Upvotes: 1