gear_9
gear_9

Reputation: 1

F# - Need help converting this to use a threadpool

I am new to F# and I have frankensteined the code below from various examples I found online in an attempt to get a better understanding of how I can use it. Currently the code below reads in a list of machines from a file and pings each of the machines. I had to divide the initial array from the file up into a smaller arrays of 25 machines to control the number of concurrent actions otherwise it takes far to long to map out the list of machines. I would like be able to use a threadpool to manage the threads but I have not found a way to make it work. Any guidance would be great. I am not able to make this work:

let creatework  = FileLines|> Seq.map (fun elem -> ThreadPool.QueueUserWorkItem(new WaitCallback(dowork), elem))

Here is the complete code:

open System.Threading
open System
open System.IO

let filePath = "c:\qa\machines.txt"

let FileLines = File.ReadAllLines(filePath)

let count = FileLines.Length/25

type ProcessResult = { exitCode : int; stdout : string; stderr : string } 

let executeProcess (exe,cmdline) = 
    let psi = new System.Diagnostics.ProcessStartInfo(exe,cmdline) 
    psi.UseShellExecute <- false
    psi.RedirectStandardOutput <- true 
    psi.RedirectStandardError <- true 
    psi.CreateNoWindow <- true
    let p = System.Diagnostics.Process.Start(psi, EnableRaisingEvents = true) 
    let output = new System.Text.StringBuilder()
    let error = new System.Text.StringBuilder() 
    p.OutputDataReceived.Add(fun args -> output.AppendLine(args.Data)|> ignore) 
    p.ErrorDataReceived.Add(fun args -> error.AppendLine(args.Data) |> ignore) 
    p.BeginErrorReadLine() 
    p.BeginOutputReadLine()
    p.WaitForExit()
    { exitCode = p.ExitCode; stdout = output.ToString(); stderr = error.ToString() } 

let dowork machinename=
    async{
        let exeout = executeProcess(@"c:\windows\system32\ping.exe", "-n 1 " + machinename)
        let exelines = 
            if exeout.stdout.Contains("Reply from") then Console.WriteLine(machinename + " " + "REPLY")
            elif exeout.stdout.Contains("Request timed out.") then Console.WriteLine(machinename + " " + "RTO")
            elif exeout.stdout.Contains("Ping request could not find host") then Console.WriteLine(machinename + " " + "Unknown Host")
            else Console.WriteLine(machinename + " " + "ERROR")
        exelines
        }

printfn "%A" (System.DateTime.Now.ToString())

for i in 0..count do
    let x = i*25
    let y = if i = count then FileLines.Length-1 else (i+1)*25
    printfn "%s %d" "X equals: " x
    printfn "%s %d" "Y equals: " y
    let filesection = FileLines.[x..y]
    let creatework = filesection |> Seq.map dowork |> Async.Parallel |> Async.RunSynchronously|>ignore
    creatework

printfn "%A" (System.DateTime.Now.ToString())
printfn "finished"

UPDATE: The code below works and provides a framework for what I want to do. The link that was referenced by Tomas Petricek did have the bits of code that made this work. I just had to figure which example was the right one. It is within 3 seconds of duplicate framework written in Java so I think I am headed in the right direction. I hope the example below will be useful to anyone else trying to thread out various executables in F#:

open System
open System.IO
open System.Diagnostics

let filePath = "c:\qa\machines.txt"

let FileLines = File.ReadAllLines(filePath)

type Process with
    static member AsyncStart psi =
        let proc = new Process(StartInfo = psi, EnableRaisingEvents = true)
        let asyncExit = Async.AwaitEvent proc.Exited
        async {
            proc.Start() |> ignore
            let! args = asyncExit
            return proc
        } 

let shellExecute(program : string, args : string) =
    let startInfo =
        new ProcessStartInfo(FileName = program, Arguments = args,
            UseShellExecute = false,
            CreateNoWindow = true,
            RedirectStandardError = true,
            RedirectStandardOutput = true)
    Process.AsyncStart(startInfo)

let dowork (machinename : string)=
    async{
        let nonbtstat = "NONE"
        use! pingout = shellExecute(@"c:\windows\system32\ping.exe", "-n 1 " + machinename)
        let pingRdToEnd = pingout.StandardOutput.ReadToEnd()
        let pingresults =
            if pingRdToEnd.ToString().Contains("Reply from") then (machinename + " " + "REPLY")
            elif pingRdToEnd.ToString().Contains("Request timed out.") then (machinename + " " + "RTO")
            elif pingRdToEnd.ToString().Contains("Ping request could not find host") then (machinename + " " + "Unknown Host")
            else (machinename + " " + "PING_ERROR")
        if pingresults.ToString().Contains("REPLY") then
            use! nbtstatout = shellExecute(@"c:\windows\system32\nbtstat.exe", "-a " + machinename)
            let nbtstatRdToEnd = nbtstatout.StandardOutput.ReadToEnd().Split('\n')
            let nbtstatline = Array.tryFind(fun elem -> elem.ToString().Contains("<00>  UNIQUE      Registered")) nbtstatRdToEnd
            return Console.WriteLine(pingresults + nbtstatline.Value.ToString())
        else return Console.WriteLine(pingresults + " " + nonbtstat)
        }

printfn "%A" (System.DateTime.Now.ToString())

let creatework = FileLines |> Seq.map dowork |> Async.Parallel |> Async.RunSynchronously|>ignore
creatework

printfn "%A" (System.DateTime.Now.ToString())
printfn "finished" 

Upvotes: 0

Views: 678

Answers (1)

Tomas Petricek
Tomas Petricek

Reputation: 243096

The main problem with your code is that executeProcess is a synchronous function that takes a long time to run (it runs the ping.exe process and waits for its result). The general rule is that tasks in a thread pool should not block for a long time (because then they block thread pool threads, which means that the thread pool cannot efficiently schedule other work).

I think you can solve this quite easily by making executeProcess asynchronous. Instead of calling WaitForExit (which blocks), you can wait for the Exitted event using Async.AwaitEvent:

let executeProcess (exe,cmdline) = async {
    let psi = new System.Diagnostics.ProcessStartInfo(exe,cmdline)  
    psi.UseShellExecute <- false 
    // [Lots of stuff omitted]
    p.BeginOutputReadLine() 
    let! _ = Async.AwaitEvent p.Exited
    return { exitCode = p.ExitCode
             stdout = output.ToString(); stderr = error.ToString() } }

This should unblock threads in the thread pool and so you'll be able to use Async.Parallel on all the URLs from the input array without any manual scheduling.

EDIT As @desco pointed out in a comment, the above is not quite right if the process exits before the AwaitEvent line is reached (before it may miss the event). To fix that, you need to use Event.guard function, which was discussed in this SO question:

Upvotes: 6

Related Questions