miljbee
miljbee

Reputation: 350

Foreach-Object -Parallel, How to implement a timeout at thread level

Foreach-Object -Parallel is great. But sometimes the script block will hang, which will block the rest of the code. There is a timeOutSeconds parameter on the Foreach-Object, but it's a global one. When the timeout is reached, any remaining object in the queue is dropped. In my opinion, it's useless, the timeout should apply on each individual thread.

So I tried to write something short and efficient to get that behavior:

# Generate data
$list = 1..100

# Max Processing time for 1 item = 1s
$job_timeout = 1

# Processing data
$list |
    ForEach-Object -parallel {
        # using start-job to be able to cancel it if it takes too long
        $j = Start-Job -InputObject $_ -ScriptBlock {
                # Simulate processing time
                Start-Sleep -Milliseconds (Get-Random -Min 0 -max 3000)
                # returning processed data
                $input
            }
        # building result object
        [pscustomobject]@{
            item = $_
            # obtaining job result ($null if it's still running)
            result = $j | Wait-Job -Timeout $using:job_timeout | Receive-job
        }
        # clean-up
        $j | stop-job -PassThru | remove-job
    }

My problem is that embedding a start-job inside a foreach-object -parallel seems dumb on a second thought. I could probably get the same result using only jobs without writing more lines.

So the question is, is there a good solution to have a per thread timeout in a foreach-object -parallel block.

Thanks for your help

Upvotes: 4

Views: 848

Answers (2)

mklement0
mklement0

Reputation: 439842

Indeed, the current semantics of the -TimeoutSeconds parameter of ForEach-Object's PowerShell (Core) 7+ -Parallel feature are unfortunate (as of PowerShell 7.3.6). To spell out your observation in more detail:

  • The -TimeoutSeconds interval is applied to the duration of the overall, typically throttled and therefore "batched" invocation[1] rather than to the runtime of each thread.

    • Therefore, a timeout can occur even if all individual threads completed in less than the specified timeout; a simple example:

      # This times out when the 3rd thread runs, because - due to ThrottleLimit 2 - 
      # it only starts after 1.5+ seconds, after the first 2 threads
      # have finished.
      1..4 | 
        ForEach-Object -TimeoutSeconds 2 -ThrottleLimit 2 -Parallel { 
          Start-Sleep -MilliSeconds 1500
          $_ # Pass the input object through.
        }
      
  • When a timeout occurs, the command terminates overall.

    • This means that threads for any remaining pipeline input then never even get to launch.
      • In the above example, only 1 and 2 print; input 4 never got processed, because processing of 3 caused the timeout.

GitHub issue #20197 asks for these shortcomings to be addressed.


As a - somewhat cumbersome - workaround, you can use the -AsJob parameter to make ForEach-Object return a job whose child jobs represent the individual threads, which can be monitored separately.

Applied to a slightly modified version of the example above that provokes a timeout for the 3rd input object:

# Use -AsJob to receive a job that allows monitoring the threads individually.
# Note that -AsJob cannot be combined with -TimeoutSeconds
$job = 
  1..4 | 
    ForEach-Object -AsJob -ThrottleLimit 2 -Parallel { 
      if ($_ -eq 3) {
        # Provoke a timeout error for this specific input.
        Start-Sleep -MilliSeconds 2500; $_
      } else {
        Start-Sleep -MilliSeconds 1500; $_
      }
    }

# Receive job output in a polling loop, and terminate child jobs
# that have run too long.
$timeout = 2
do {
  Start-Sleep -Milliseconds 500 # Sleep a little.
  # Get pending results.
  $job | Receive-Job
  # If any child jobs have been running for more than N seconds,
  # stop (terminate) them.
  # This will open up slots for more threads to spin up.
  foreach ($childJob in $job.ChildJobs.Where({ $_.State -eq 'Running' })) {
    if (([datetime]::now - $childJob.PSBeginTime).TotalSeconds -ge $timeout) {
      Write-Verbose -Verbose "Stopping job with ID $($childJob.Id) due to running longer than $timeout seconds..."
      $childJob | Stop-Job
    }
  }
} while ($job.ChildJobs.Where({ $_.State -in 'NotStarted', 'Running' }))

Output:

1
2
4
VERBOSE: Stopping job with ID 4 due to running longer than 2 seconds...

Note:

  • Input 4 was still processed, despite the thread for input 3 having timed out.

  • The ID value of the child job isn't really meaningful except to distinguish it from other child jobs; if you want to know what input object caused the timeout, you'll have to echo it as part of the script block (at the start, before a timeout can occur) - the job object doesn't contain this information.


[1] More accurately, only a fixed number of threads are allowed to run at a time, based on the -ThrottleLimit arguments, which defaults to 5. If more threads are needed, they have to wait until "slots" open up, which happens when currently executing threads finish.

Upvotes: 1

Santiago Squarzon
Santiago Squarzon

Reputation: 60883

Perhaps ForEach-Object -Parallel is not the right cmdlet to approach this problem, it might be simpler to use Start-ThreadJob in this case.

You could, in theory, implement a similar logic if using -AsJob from ForEach-Object -Parallel then targeting the .ChildJobs property but I would personally not try to force that, it would only make things more complicated than they should be.

$timeout = [timespan]::FromSeconds(1)
[System.Collections.Generic.List[object]] $jobs = 1..100 |
    ForEach-Object {
        Start-ThreadJob {
            # Simulate processing time
            Start-Sleep -Milliseconds (Get-Random -Min 0 -Maximum 3000)
            # returning processed data
            $using:_
        } -ThrottleLimit 7
    }



do {
    for ($i = 0; $i -lt $jobs.Count; $i++) {
        $job = $jobs[$i]
        
        # if the job is completed
        if ($job.State -eq 'Completed') {
            # get the output
            Receive-Job $job
            # and remove it from the job list
            $null = $jobs.Remove($job)
            continue
        }
        
        # if the job is not running,
        # it could be either stopped or failed
        if ($job.State -ne 'Running') {
            # ignore this one
            continue
        }
        
        # if the time difference since its start is greater than the timeout
        if ([datetime]::Now - $job.PSBeginTime -gt $timeout) {
            # stop it, allowing a new job to start (this is based on `-ThrottleLimit`)
            $job.StopJob($true, 'You are out of time')
        }
    }
}
while ($jobs.State -contains 'Running')

$jobs # <- Contains all jobs that ran out of time

Upvotes: 3

Related Questions