Reputation: 93
I try a concurrent download with limited number of Workers using the AsyncSeq module.
Based on the FSharpX example of https://github.com/fsprojects/fsharpx/blob/master/samples/Crawler.fsx
let rec concurrentDownload concurrentWorkers download transform (urls:string list) =
asyncSeq {
let requests = BlockingQueueAgent<_>(1000)
let results = BlockingQueueAgent<_>(50)
let worker() = async {
while true do
let! url = requests.AsyncGet()
let! doc = download url
match doc with
| Some txt -> do! results.AsyncAdd( transform txt )
| _ -> ()
}
// fill in all the requests
for url in urls do
do! requests.AsyncAdd url
// create the workers and start them
for i in 1 .. concurrentWorkers do
worker() |> Async.Start
// get the results and yield them in the asynchronous sequence
while requests.Count > 0 && results.Count > 0 do
let! res = results.AsyncGet()
yield res
}
let rand = new System.Random()
let rnd() = rand.Next(0,4000)
// a simulated download, sleep time depends on random number
let download str = async {
let name = "dl " + str
let payload = rnd()
printfn "Started : %s (payload=%d)" name payload
do! Async.Sleep(1000 + payload)
printfn "Finished: %s" name
return Some name
}
let urls = [1..10] |> List.map (sprintf "URL %d")
let concurrentWorkers = 5
let transform = id
let ret = concurrentDownload concurrentWorkers download transform urls
//ret // val it : AsyncSeq<string> = Microsoft.FSharp.Control.FSharpAsync`1[FSI_0012.FSharp.Control.AsyncSeqInner`1[System.String]]
let z =
ret
|> AsyncSeq.toBlockingSeq
|> Seq.toList
I assumed that z gets something like seq ["dl URL 3"; "dl URL 5"; ... ] because 'download' returns Some content. The workers on the blocking queues working as expected:
Started : dl URL 1 (payload=2281)
Started : dl URL 3 (payload=741)
Started : dl URL 4 (payload=3283)
Started : dl URL 5 (payload=1117)
Started : dl URL 2 (payload=2435)
Finished: dl URL 3
Started : dl URL 6 (payload=263)
Finished: dl URL 5
Started : dl URL 7 (payload=1115)
Finished: dl URL 6
Started : dl URL 8 (payload=1041)
Finished: dl URL 1
Started : dl URL 9 (payload=959)
Finished: dl URL 2
Started : dl URL 10 (payload=604)
Finished: dl URL 7
Finished: dl URL 4
Finished: dl URL 10
Finished: dl URL 8
Finished: dl URL 9
The problem is, why is z an empty list? And not as exprected seq ["dl URL 3"; "dl URL 5"; ... ]?
As a reference, here is the toBlockingSeq function:
// FSharpX AsyncSeq.toBlockingSeq
let toBlockingSeq (input : AsyncSeq<'T>) =
// Write all elements to a blocking buffer and then add None to denote end
let buf = new BlockingQueueAgent<_>(1)
async {
do! iterAsync (Some >> buf.AsyncAdd) input
do! buf.AsyncAdd None
} |> Async.Start
// Read elements from the blocking buffer & return a sequences
let rec loop () =
seq {
match buf.Get() with
| None -> ()
| Some v ->
yield v
yield! loop()
}
loop ()
Upvotes: 0
Views: 135
Reputation: 5323
The new solution is maxDegreeOfParallelism parameter for Async.Parallel and you can do also Async.Sequential
Upvotes: 0