su8898
su8898

Reputation: 1713

Combining AsyncSeq.ofObservableBuffered and AsyncSeq.cache

I've a requirement where I need to cache the results of an event into an asyncSeq and iterate though this asyncSeq once a long running function has returned. The idea here is to iterate through profiles and apply runFunction to both the cached results as well as the future results (hence the use of ofObservableBuffered).

I would like to know what's the best way to do this. I'm using AsyncSeq.Cache as shown below. AsyncSeq.Cache is using a combination of ResizeArray and MailboxProcessor to accomplish the caching. However, I'm not sure if this will lead to a memory leak.

let profiles =
    client.ProfileReceived
    |> AsyncSeq.ofObservableBuffered
    |> AsyncSeq.cache
    
do! longRunningFunction()

do! function2 (fun _ ->
    async {
        do! profiles
            |> AsyncSeq.iterAsync(fun profile -> async {
                do! runFunction profile
                return()
            })
    }

Upvotes: 0

Views: 81

Answers (1)

Tomas Petricek
Tomas Petricek

Reputation: 243096

Your question does not give all details about your scenario, but I think the answer is that you do not need AsyncSeq.cache and using just AsyncSeq.ofObservableBuffered should be enough.

Asynchronous sequences generate values on demand ("pull based") which means that elements are only generated when needed. Observables are "push based" which means that they generate data whenever the data source decides.

To map from "push based" to "pull based", you either need to drop data (when the listener is not ready to accept the next item) or cache data. If you cache, then you may potentially run out of memory if the producer is faster than the consumer - but this is inevitable by design problem. The AsyncSeq.ofObservableBuffered function does the latter.

AsyncSeq.cache is useful if you have one data source, but want to consume it from multiple different places. Without this, the data source will generate data repeatedly for each consumer, so cache enables generating the data just once. However, if you are using ofObservableBuffered is already doing the same thing - i.e. caching all the generated values.

I believe the only reason why you might want to keep cache is if you subscribe to the observable at a later time - in which case, it would keep elements generated before you started consuming values (and use two caches, one of which would keep not-yet-consumed elements (possibly safe if the consumer is fast enough) and one which would keep all the already generated elements (certainly potential to run out of memory over a longer period of time)).

Upvotes: 2

Related Questions