Reputation: 35134
I have an IObservable
provided by a library, which listens to events from an external service:
let startObservable () : IObservable<'a> = failwith "Given"
For each received event I want to execute an action which returns Async
:
let action (item: 'a) : Async<unit> = failwith "Given"
I'm trying to implement a processor in lines of
let processor () : Async<unit> =
startObservable()
|> Observable.mapAsync action
|> Async.AwaitObservable
I've made up mapAsync
and AwaitObservable
: ideally they would be provided by some library, which I'm failing to find so far.
Extra requirements:
Actions should be executed sequentially, so subsequent events get buffered while a previous event is handled.
If an action throws an error, I want my processor to complete. Otherwise, it never completes.
Cancellation token passed via Async.Start
should be respected.
Any hints about the library that I should be using?
Upvotes: 3
Views: 472
Reputation: 3476
Hopac could easily interop with IObservables
You could convert Hopac Jobs
to Async
with Job.toAsync
open System
open Hopac
let startObservable () : IObservable<'a> = failwith "Given"
let action (item: 'a) : Job<unit> = failwith "Given"
let processor () : Job<unit> =
startObservable()
|> Stream.ofObservable
|> Stream.mapJob action
|> Stream.iter
Upvotes: 1
Reputation: 7542
Since you want to convert a push-based model (IObservable<>
) into pull-based (Async<>
), you'll need to queue to buffer data coming from observable. If queue is size-limited - which tbh. should be to to make entire pipeline safe to not overflood the memory - then also a strategy for buffer overflow is needed.
MailboxProcessor<>
and custom observable, which would Post data to it. Since MP is a native F# actor implementation, it's able to make ordered processing with queue for buffering spikes.Another option is to use FSharp.Control.AsyncSeq
(and specifically AsyncSeq.ofObservableBuffered function) which will turn observable into pull-based async enumerable - underneath it uses mailbox processor from 1st point:
startObservable()
|> AsyncSeq.ofObservableBuffered
|> AsyncSeq.iterAsync action
Upvotes: 5