Reputation: 1327
In a Node application I'm trying to process a stream of events using RxJS. The event stream is a list of changes to many documents. I'm using groupBy to partition the stream into new streams by documentId. But I'm wondering, once a document is closed on the client and no new events are added to the stream for that documentId, will groupBy dispose of that document's stream once it is empty? If not, how would I manually do that? I want to avoid a memory leak caused by new documents streams being create but never destroyed.
Upvotes: 1
Views: 1189
Reputation: 39182
What I'd suggest doing:
instead of just having a documentChanges observable, have a documentEvents observable.
Clients will send documentOpened events when they open a document, documentChanged events when they change a document and documentClosed events when they close a document.
By sending all 3 types of events through the same observable, you establish and guarantee an ordering. If a client that sends documentOpened, documentChanged, documentClosed events in that order, then your server will see them in that order. Note there won't be any guarantees about the order of events sent by 2 different clients. This will just let you ensure that the events sent by a particular client will be in order.
And then, this is how you'd use groupByUntil
:
documentEvents
.groupByUntil(
function (e) { return e.documentId; }, // key
null, // element
function (group) { // duration selector
var documentId = group.key;
return group.filter(function (e) { return e.eventType === 'documentClosed'; });
})
.flatMap(function (eventsForDocument) {
var documentId = eventsForDocument.key;
return eventsForDocument.whatever(...);
})
.subscribe(...);
Another option that is a lot simpler: you can just expire the group after an idle period. Depending on what you are doing with the events this may be more than sufficient. This example expires a group if the document has not been edited in 5 minutes. If more edits come in then a new group is spun up.
var idleTime = 5 * 60 * 1000;
events
.groupByUntil(
function(e) { return e.documentId; },
null,
function(g) { return g.debounce(idleTime); })
.flatMap...
Upvotes: 2
Reputation: 29776
Since you included the .NET tag, I'll cover Rx.NET as well.
Your question is a phrased a bit incorrectly. Streams are empty if and only if they never have an event. So, they can't become empty. A stream that isn't emitting data doesn't typically consume much in the way of resources though.
In .NET, groups will not terminate until the source terminates. We use 'GroupByUntil` which allows you to specify a durationSelector stream for each group. Observable.Timer often works well for this.
This means that you may get multiple non-concurrent streams with the same key appearing over time, but if (as is often the case) your group streams are flattened at some point, it won't matter.
In rxjs, we also have groupByUntil.
In Rx-Java, the groupByUntil method, which behaved similarly, was rolled into groupBy - see https://github.com/ReactiveX/RxJava/pull/1727 and https://github.com/benjchristensen/RxJava/commit/b9302956832e3e77579f63fd9db25aa60eb4192a for more details.
http://reactivex.io/documentation/operators/groupby.html says:
If you unsubscribe from one of the GroupedObservables, that GroupedObservable will be terminated. If the source Observable later emits an item whose key matches the GroupedObservable that was terminated in this way, groupBy will create and emit a new GroupedObservable to match the key.
So, in Rx-Java you must unsubscribe from a grouped observable stream to terminate it. takeUntil
with a timer
stream can work for this.
Addendum:
In response to your comment, a stream will not terminate until a downstream operator unsubscribes from it. The duration selector of groupByUntil would cause termination. If a document will not be opened again once closed, then you can just send a "documentclosed" event into the stream and use a regular groupBy with a takeWhile testing for the "documentClosed".
The reason why it's important the document is not opened again is because with groupBy (in rx-js, and rx.net) a new group will not be created if an already seen key reappears.
If this is a problem, then you will need to use groupByUntil and use a published stream to watch for the documentClosed event - using a published stream will ensure you don't get subscription side effects.
Upvotes: 1