Reputation: 1713
I've a requirement where I need to cache the results of an event into an asyncSeq
and iterate though this asyncSeq
once a long running function has returned. The idea here is to iterate through profiles and apply runFunction
to both the cached results as well as the future results (hence the use of ofObservableBuffered
).
I would like to know what's the best way to do this. I'm using AsyncSeq.Cache
as shown below. AsyncSeq.Cache
is using a combination of ResizeArray
and MailboxProcessor
to accomplish the caching. However, I'm not sure if this will lead to a memory leak.
let profiles =
client.ProfileReceived
|> AsyncSeq.ofObservableBuffered
|> AsyncSeq.cache
do! longRunningFunction()
do! function2 (fun _ ->
async {
do! profiles
|> AsyncSeq.iterAsync(fun profile -> async {
do! runFunction profile
return()
})
}
Upvotes: 0
Views: 81
Reputation: 243096
Your question does not give all details about your scenario, but I think the answer is that you do not need AsyncSeq.cache
and using just AsyncSeq.ofObservableBuffered
should be enough.
Asynchronous sequences generate values on demand ("pull based") which means that elements are only generated when needed. Observables are "push based" which means that they generate data whenever the data source decides.
To map from "push based" to "pull based", you either need to drop data (when the listener is not ready to accept the next item) or cache data. If you cache, then you may potentially run out of memory if the producer is faster than the consumer - but this is inevitable by design problem. The AsyncSeq.ofObservableBuffered
function does the latter.
AsyncSeq.cache
is useful if you have one data source, but want to consume it from multiple different places. Without this, the data source will generate data repeatedly for each consumer, so cache
enables generating the data just once. However, if you are using ofObservableBuffered
is already doing the same thing - i.e. caching all the generated values.
I believe the only reason why you might want to keep cache
is if you subscribe to the observable at a later time - in which case, it would keep elements generated before you started consuming values (and use two caches, one of which would keep not-yet-consumed elements (possibly safe if the consumer is fast enough) and one which would keep all the already generated elements (certainly potential to run out of memory over a longer period of time)).
Upvotes: 2