Reputation: 10039
I have an Rx stream that is an outgoing change feed from a particular component.
Occasionally I want to be able to enter a batch mode on the stream so that items passed to onNext accumulate and are passed on only when the batch mode is exited.
Normally I'll pass single items through the stream:
stream.onNext(1);
stream.onNext(2);
There is a 1-to-1 mapping between the items passed to onNext and the items received in the subscribe, so the previous snippet results in two calls to subscribe with the the values 1 and 2.
The batch mode I am looking for might work something like this:
stream.enterBatchMode();
stream.onNext(1);
stream.onNext(2);
stream.exitBatchMode();
In this case I want subscribe to only be invoked once with the single concatenated array [1, 2].
Just to reiterate, I need batch mode only sometimes, at other times I pass the non-concatenated items through.
How can I achieve this behaviour with Rx?
NOTE: Previously I was passing arrays through onNext although this is mostly so that the type remains the same when in individual mode and when in batch-mode.
Eg:
stream.onNext([1, 2, 3]);
stream.onNext([4, 5, 6]);
Subscribe receives [1, 2, 3] then [4, 5, 6], but when in batch-mode subscribe receives the concatenated results [1, 2, 3, 4, 5, 6].
So when you look at it this way it's more like unconcatenated and concatenated modes (as opposed to individual and batch-mode). But I think the problem is very similar.
Upvotes: 1
Views: 1741
Reputation: 39212
Here is the spirit of James World's c# solution converted to JavaScript and tailored to your specific question.
var source = new Rx.Subject();
source.batchMode = new Rx.BehaviorSubject(false);
source.enterBatchMode = function() { this.batchMode.onNext(true); };
source.exitBatchMode = function() { this.batchMode.onNext(false); };
var stream = source
.window(source.batchMode
.distinctUntilChanged()
.skipWhile(function(mode) { return !mode; }))
.map(function(window, i) {
if ((i % 2) === 0) {
// even windows are unbatched windows
return window;
}
// odd windows are batched
// collect the entries in an array
// (e.g. an array of arrays)
// then use Array.concat() to join
// them together
return window
.toArray()
.filter(function(array) { return array.length > 0; })
.map(function(array) {
return Array.prototype.concat.apply([], array);
});
})
.concatAll();
stream.subscribe(function(value) {
console.log("received ", JSON.stringify(value));
});
source.onNext([1, 2, 3]);
source.enterBatchMode();
source.onNext([4]);
source.onNext([5, 6]);
source.onNext([7, 8, 9]);
source.exitBatchMode();
source.onNext([10, 11]);
source.enterBatchMode();
source.exitBatchMode();
source.onNext([12]);
source.enterBatchMode();
source.onNext([13]);
source.onNext([14, 15]);
source.exitBatchMode();
<script src="https://getfirebug.com/firebug-lite-debug.js"></script>
<script src="https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js"></script>
Upvotes: 2
Reputation: 10039
A solution... of sorts. After failing to achieve anything with the Window function (from James' linked example) I came up with the following solution, although it is in C# and I have no idea how to translate it to RxJS and Javascript.
If someone can better this using built-in Rx functions, please do so, I'll very much appreciate a better solution.
I have implemented a BatchedObservable class that is an observer of type T and an observable of type IEnumerable T. In normal-mode it wraps each T in an array that is passed on. In batch-mode it accumulates Ts until batch-mode is exited, then it passes on the collected list of batched Ts.
public class BatchedObservable<T> : IObservable<IEnumerable<T>>, IObserver<T>
{
bool batching = false;
List<T> batchedItems;
List<IObserver<IEnumerable<T>>> observers = new List<IObserver<IEnumerable<T>>>();
public void EnterBatchMode()
{
batching = true;
batchedItems = new List<T>();
}
public void ExitBatchMode()
{
batching = false;
if (batchedItems.Count > 0)
{
foreach (var observer in observers)
{
observer.OnNext(batchedItems);
}
}
batchedItems = null;
}
public IDisposable Subscribe(IObserver<IEnumerable<T>> observer)
{
observers.Add(observer);
return Disposable.Create(()
=> observers.Remove(observer)
);
}
public void OnCompleted()
{
foreach (var observer in observers)
{
observer.OnCompleted();
}
}
public void OnError(Exception error)
{
foreach (var observer in observers)
{
observer.OnError(error);
}
}
public void OnNext(T value)
{
if (batching)
{
batchedItems.Add(value);
}
else
{
foreach (var observer in observers)
{
observer.OnNext(new T[] { value });
}
}
}
}
Example usage follows. When running, hit any key to toggle batch mode.
static void Main(string[] args)
{
var batched = new BatchedObservable<long>();
var dataStream = Observable.Interval(TimeSpan.FromSeconds(1));
dataStream.Subscribe(batched);
batched.Subscribe(PrintArray);
var batchModeEnabled = false;
while (true)
{
Console.ReadKey();
batchModeEnabled = !batchModeEnabled;
if (batchModeEnabled)
{
batched.EnterBatchMode();
}
else
{
batched.ExitBatchMode();
}
}
}
private static void PrintArray<T>(IEnumerable<T> a)
{
Console.WriteLine("[" + string.Join(", ", a.Select(i => i.ToString()).ToArray()) + "]");
}
Upvotes: 0
Reputation: 2662
James' comment may provide the answer you're looking for, but I'd like to offer a simple alternative that you may also like.
I read your question as this: "how do I pass a value of type A or type B"?
Answer: Define a type with "either" semantics, which contains data of either type A or type B.
var valueOrBuffer = function(value, buffer)
{
this.Value = value;
this.Buffer = buffer;
};
var createValue = function(value) { return new valueOrBuffer(value); }
var createBuffer = function(buffer) { return new valueOrBuffer(undefined, buffer); }
stream.OnNext(createValue(1));
stream.OnNext(createValue(2));
stream.OnNext(createValue(3));
stream.OnNext(createBuffer([4, 5, 6]));
No switching necessary. Your observer may have to change a bit though:
stream.Subscribe(function(v) {
if (v.Value)
onNextValue(v.Value);
else
onNextBuffer(v.Buffer);
});
Upvotes: 1