Reputation: 29
I've spent hours combing through documentation and tutorials, but can't figure out how to use ReactiveX to poll an external resource, or anything for that matter, every at an interval. Below is some code I wrote to get information from a REST API at an interval.
open System
open System.Reactive.Linq
module MyObservable =
let getResources =
async {
use client = new HttpClient()
let! response = client.GetStringAsync("http://localhost:8080/items") |> Async.AwaitTask
return response
} |> Async.StartAsTask
let getObservable (interval: TimeSpan) =
let f () = getResources.Result
Observable.Interval(interval)
|> Observable.map(fun _ -> f ())
To test this out, I tried subscribing to the Observable and waiting five seconds. It does receive something every second for five seconds, but the getResources
is only called the first time and then the result is just used at each interval. How can I modify this to make the REST call at each interval instead of just the result of the first call being used over and over again?
let mutable res = Seq.empty
getObservable (new TimeSpan(0,0,1))
|> Observable.subscribe(fun (x: seq<string>) -> res <- res |> Seq.append x;)
|> ignore
Threading.Thread.Sleep(5000)
Upvotes: 0
Views: 124
Reputation: 80744
Don't use a Task
. Tasks are what we call "hot", meaning that if you have a value of type Task
in your hand, it means that the task is already running, and there is nothing you can do about it. In particular, this means you cannot restart it, or start a second instance of it. Once a Task
is created, it's too late.
In your particular case it means that getResources
is not "a way to start a task", but just "a task". Already started, already running.
If you want to start a new task every time, you have two alternatives:
First (the worse alternative), you could make getResources
a function rather than a value, which you can do by giving it a parameter:
let getResources () =
async { ...
And then call it with that parameter:
let f () = getResources().Result
This will run the getResources
function afresh every time you call f()
, which will create a new Task
every time and start it.
Second (a better option), don't use a Task
at all. You're creating a perfectly good async
computation and then turning it into a Task
only to block on getting its result. Why? You can block on an async
's result just as well!
let getResources = async { ... }
let getObservable interval =
let f () = getResources |> Async.RunSynchronously
...
This works, even though getResources
is not a function, because async
s, unlike Task
s, are what we call "cold". This means that, if you have an async
in your hand, it doesn't mean that it's already running. async
, unlike Task
, represents not an "already running" computation, but rather "a way to start a computation". A corollary is that you can start it multiple times from the same async
value.
One way to start it is via Async.RunSynchronously
as I'm doing in my example above. This is not the best way, because it blocks the current thread until the computation is done, but it's equivalent to what you were doing with accessing the Task.Result
property, which also blocks until the Task
is done.
Upvotes: 2