Max
Max

Reputation: 20004

Distributing workload

In my application I have a number of objects which can perform some long lasting calculations, let's call them clients. Also I have a number of objects which contain descriptions of tasks to be calculated. Something like this:

let clients = [1..4]
let tasks = [1..20]

let calculate c t =
    printf "Starting task %d with client %d\n" t c
    Thread.Sleep(3000)
    printf "Finished task %d with client %d\n" t c

With one client I can only start one task at a time. I want to create a function/class which will assign the tasks to the clients and perform the calculations. I've done this in C# using a queue of clients, so as soon as a new task is assigned to a client, this client is removed from the queue and when the calculations are finished, the client is released and is placed in the queue again. Now I'm interested in implementing this in a functional way. I've tried to experiment with asynchronous workflows, but I cannot think of a proper way to implement this.

Here's an F#-like code that I was trying to make work, but couldn't:

let rec distribute clients tasks calculate tasks_to_wait =
    match clients, tasks with
    | _ , [] -> ()           // No tasks - we're done!
    | [], th::tt ->          // No free clients, but there are still tasks to calculate.
             let released_client = ?wait_for_one? tasks_to_wait 
             distribute [released_client] tasks calculate ?tasks_to_wait?

    | ch::ct, th::tt ->      // There are free clients. 
        let task_to_wait() =
             do calculate ch th 
             ch
        distribute ct tt calculate (task_to_wait::tasks_to_wait)

How do I do this? Is there a functional design pattern to solve this task?

Upvotes: 2

Views: 218

Answers (1)

Tomas Petricek
Tomas Petricek

Reputation: 243041

There are various ways to do this. It would be perfectly fine to use some concurrent collection (like a queue) from .NET 4.0 from F#, as this is often the easiest thing to do, if the collection already implements the functionality you need.

The problem requires concurrent access to some resource, so it cannot be solved in a purely functional way, but F# provides agents, which give you a nice alternative way to solve the problem.

The following snippet implements an agent that schedules the work items. It uses its own mailbox to keep the available clients (which gives you a nice way to wait for the next available client). After the agent is created, you can just send all the initial clients. It will continue iterating over the tasks while clients are available. When there is no available client, it will block (asynchronously - without blocking of threads), until some previous processing completes and a client is sent back to the agent's mailbox:

let workloadAgent = MailboxProcessor.Start(fun agent -> 
  // The body of the agent, runs as a recursive loop that iterates
  // over the tasks and waits for client before it starts processing
  let rec loop tasks = async {
    match tasks with 
    | [] -> 
        // No more work to schedule (but there may be still calculation,
        // which was started earlier, going on in the background)
        ()
    | task::tasks ->
        // Wait for the next client to become available by receiving the
        // next message from the inbox - the messages are clients
        let! client = agent.Receive()
        // Spanw processing of the task using the client
        async { calculate client task
                // When the processing completes, send the client
                // back to the agent, so that it can be reused
                agent.Post(client) }
        |> Async.Start 
        // Continue processing the rest of the tasks
        return! loop tasks }

  // Start the agent with the initial list of tasks
  loop tasks )

// Add all clients to the agent, so that it can start
for client in clients do workloadAgent.Post(client)

If you're not familiar with F# agents, then the MSDN section Server-Side Programming has some useful information.

Upvotes: 6

Related Questions