Mikhail Shilkov
Mikhail Shilkov

Reputation: 35134

Mixing IObservable and Async<'a> in F#

I have an IObservable provided by a library, which listens to events from an external service:

let startObservable () : IObservable<'a> = failwith "Given"

For each received event I want to execute an action which returns Async:

let action (item: 'a) : Async<unit> = failwith "Given"

I'm trying to implement a processor in lines of

let processor () : Async<unit> =
    startObservable()
    |> Observable.mapAsync action
    |> Async.AwaitObservable

I've made up mapAsync and AwaitObservable: ideally they would be provided by some library, which I'm failing to find so far.

Extra requirements:

Any hints about the library that I should be using?

Upvotes: 3

Views: 472

Answers (2)

Szer
Szer

Reputation: 3476

Hopac could easily interop with IObservables You could convert Hopac Jobs to Async with Job.toAsync

open System
open Hopac

let startObservable () : IObservable<'a> = failwith "Given"

let action (item: 'a) : Job<unit> = failwith "Given"

let processor () : Job<unit> =
    startObservable()
    |> Stream.ofObservable
    |> Stream.mapJob action
    |> Stream.iter

Upvotes: 1

Bartosz Sypytkowski
Bartosz Sypytkowski

Reputation: 7542

Since you want to convert a push-based model (IObservable<>) into pull-based (Async<>), you'll need to queue to buffer data coming from observable. If queue is size-limited - which tbh. should be to to make entire pipeline safe to not overflood the memory - then also a strategy for buffer overflow is needed.

  1. One way is to implement a MailboxProcessor<> and custom observable, which would Post data to it. Since MP is a native F# actor implementation, it's able to make ordered processing with queue for buffering spikes.
  2. Another option is to use FSharp.Control.AsyncSeq (and specifically AsyncSeq.ofObservableBuffered function) which will turn observable into pull-based async enumerable - underneath it uses mailbox processor from 1st point:

    startObservable()
    |> AsyncSeq.ofObservableBuffered
    |> AsyncSeq.iterAsync action
    

Upvotes: 5

Related Questions