ca9163d9
ca9163d9

Reputation: 29159

Parallel processing N threads?

Clarified question:

I have the following script which access web and local resources. I want to limit the web connections to N (the website is slow) and the local resources access (executeLocalNetworkProcess) shouldn't block other web requests. (So it will always run N web requests).

Some categories have very little items while others have a lot. The parallel execution should be run for all items of all categories to utilize the web connectdions.

let categories = getCategories() // get a seq of category from web service
for c in categories do
    getItemsByCategory c // returns a seq of item from web 
    |> Seq.iter (fun (item, _) -> // Want to process N items in parallel
        getMoreDataFromWeb item // from web
        executeLocalNetworkProcess item // local disk/network access; may take a while
        downloadBigFile item  // from web
        )

What's the best approach to implement it in F#?

Upvotes: 0

Views: 302

Answers (2)

Marc Sigrist
Marc Sigrist

Reputation: 4190

You might want to include the PSeq module source code from the F# PowerPack in your own foundation library. Then, you can simply call PSeq.iter:

for category, description, _ as c in getCategories() do
    printfn "%s" category
    getItemsByCategory c
    |> PSeq.iter(fun (item, description, _) -> process item)

Upvotes: 1

Troy Kershaw
Troy Kershaw

Reputation: 590

I have done something similar previously by splitting the sequence into batches of size n and processing the batch in parallel.

We can create batches of a sequence by using the code in this SO answer: https://stackoverflow.com/a/7518857/2461791

From there we just need to iterate through the items in each batch in parallel. I like to put it in the Array.Parallel module.

module Array =
    module Parallel =
        let batchIter size action array =
            let batchesOf n =
                Seq.mapi (fun i v -> i / n, v) >>
                Seq.groupBy fst >>
                Seq.map snd >>
                Seq.map (Seq.map snd)

            for batch in array |> batchesOf size do 
                batch
                |> Seq.toArray    
                |> Array.Parallel.iter action

The following code splits a list of 100 items into batches of 8 and prints the items of each batch in parallel.

[1..100]
|> Array.Parallel.batchIter 8 (printfn "%d")

To apply this to your code, you're looking at something like this:

let categories = getCategories()
for c in categories do
    match c with | (category, description, _) -> printfn "%s" category
    getItemsByCategory c
    |> Array.Parallel.batchIter 8 (fun (item, description, _) ->
        process item
        )

This approach, however, will wait for an entire batch to finish processing before starting the next batch.

Upvotes: 1

Related Questions