Reputation: 13
I'm trying to understand parallel processing in PS5. Everything was straightforward until I came up on the overload of BeginInvoke with two parameters: BeginInvoke<TInput,TOutput>. Found something on a blog that works well for the TOutput parameter.
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$ScriptBlock = { Get-Random -Maximum 100 }
$Instances = (1..5) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs,$Outputs)
}
}
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
Running this and looking at $Output yields the expected result:
PS C:\Users\xyz> $Outputs 10 74 41 56 59
Now when I tried to pass something to the Script Block via $Inputs I never succeeded. I know that you can do it with AddParameters but I don't like to leave a stone unturned and would like to understand how you can do it with this overload. Spent a week now looking at resources on the net but could not find a way to do it correctly. From what I know this will be passed via the pipeline like the $Outputs is at the end. Here is one way ( from the thousand that I tried...) that doesn't work:
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
#Let's say I want to add a bias to the random value
$Inputs.Add( [PSCustomObject]@{
Bias = 100 }
)
$ScriptBlock = {
Param
(
#Hoping to get value from pipeline
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[System.Management.Automation.PSDataCollection]$Bias
)
$BiasValue = [PSCustomObject]$Bias[0]
(Get-Random -Maximum 100) + $BiasValue[0].'Bias' }
#Create the threads
$Instances = (1..10) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs,$Outputs)
}
}
#Wait for all threads to finish
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
Of course this code doesn't do anything useful, it's just a test to find out how to get the $Inputs value in the ScriptBlock. Now $Outputs is completely empty pointing at an error in the ScriptBlock.
Any help would be appreciated.
Upvotes: 1
Views: 353
Reputation: 440037
The following two changes are required to make your code work:
Define $ScriptBlock
with a pipeline-binding parameter that receives a parameter of type [pscustomobject]
parameter, not of type [System.Management.Automation.PSDataCollection[pscustomobject]]
, and process that parameter in a process { ... }
block.
Pipeline-binding parameters do not receive collections, they receive the elements of collections provided as pipeline input, because collections (enumerables) are enumerated in the pipeline, i.e. their elements are sent to the pipeline one by one.
To ensure processing of each pipeline input object, a process { ... }
block is required - see Piping Objects to Functions
Call .Complete()
on the [System.Management.Automation.PSDataCollection`1]
collection that serves as the pipeline input, which is required for processing to complete (end).
Here's the modified version of your code (with as few modifications as necessary; additional improvements are possible) - look for # !!
comment lines to indicate the changes:
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
#Let's say I want to add a bias to the random value
$Inputs.Add( [PSCustomObject]@{
Bias = 100 }
)
# !! Call .Complete() on the input collection to ensure that pipeline
# !! processing ends.
$Inputs.Complete()
$ScriptBlock = {
Param
(
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[pscustomobject]$Bias # !! Define the parameter as [pscustomobject]
)
process { # !! Use a `process` block to process each input object.
(Get-Random -Maximum 100) + $Bias.Bias }
}
#Create the threads
$Instances = (1..10) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs, $Outputs)
}
}
#Wait for all threads to finish
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
# !! Print the output objects
$Outputs
# !! Clean up.
$Instances.Instance.Dispose()
$RunspacePool.Dispose()
Note that [System.Management.Automation.PSDataCollection`1]
is thread-safe, so it shouldn't be a problem that multiple runspaces share the same output collection instance, $Outputs
.
Upvotes: 0
Reputation: 60978
Hopefully this example helps you get a better understanding. The key issue with your code is that your scriptblock is missing its process
block and the parameter of the scriptblock should be psobject
or just object
since what the threads will receive from pipeline (TInput
) is just a pscustomobject
, they do not receive the entire PSDataCollection<TInput>
. It's also hard to give an example from PowerShell because it is single threaded.
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$ScriptBlock = {
Param(
# all threads will receive this and process it in parallel
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[psobject] $Bias
)
process {
[pscustomobject]@{
ThreadId = [runspace]::DefaultRunspace.Id
PipelineInput = $Bias
Result = (Get-Random -Maximum 100) + $Bias.Value
}
Start-Sleep 1
}
}
$jobs = [System.Collections.Generic.List[object]]::new()
#Create the threads
1..10 | ForEach-Object {
# simulate input from pipeline
$Inputs.Add([pscustomobject]@{ Value = $_ })
# now start processing
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
$jobs.Add([PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs, $Outputs)
})
# simulate output, ReadAll() will copy the output into a new
# collection that we can safely read and clear itself
if ($Outputs.Count) {
$Outputs.ReadAll()
}
}
$Inputs.Complete()
# now block until processing is done
do {
$id = [System.Threading.WaitHandle]::WaitAny($jobs.State.AsyncWaitHandle, 200)
# if there is any output from threads, consume it
if ($Outputs.Count) {
$Outputs.ReadAll()
}
if ($id -eq [System.Threading.WaitHandle]::WaitTimeout) {
continue
}
$job = $jobs[$id]
$job.Instance.EndInvoke($job.State)
$job.Instance.Dispose()
$jobs.RemoveAt($id)
}
while ($jobs.Count)
$RunspacePool.Dispose()
Upvotes: 0