我正在尝试了解 PS5 中的并行处理。一切都很简单,直到我遇到带有两个参数的 BeginInvoke 重载:BeginInvoke
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$ScriptBlock = { Get-Random -Maximum 100 }
$Instances = (1..5) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs,$Outputs)
}
}
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
运行此程序并查看 $Output 会产生预期结果:
PS C:\Users\xyz> $输出 10 74 41 56 59
现在,当我尝试通过 $Inputs 将某些内容传递给脚本块时,我从未成功。我知道您可以使用 AddParameters 来完成此操作,但我不想千方百计,并且想了解如何使用此重载来完成此操作。现在花了一周时间在网上查看资源,但找不到正确的方法。据我所知,这将通过管道传递,就像最后的 $Outputs 一样。这是一种行不通的方法(从我尝试过的数千种方法中......):
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
#Let's say I want to add a bias to the random value
$Inputs.Add( [PSCustomObject]@{
Bias = 100 }
)
$ScriptBlock = {
Param
(
#Hoping to get value from pipeline
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[System.Management.Automation.PSDataCollection]$Bias
)
$BiasValue = [PSCustomObject]$Bias[0]
(Get-Random -Maximum 100) + $BiasValue[0].'Bias' }
#Create the threads
$Instances = (1..10) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs,$Outputs)
}
}
#Wait for all threads to finish
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
当然,这段代码没有做任何有用的事情,它只是一个测试,以了解如何获取 ScriptBlock 中的 $Inputs 值。现在 $Outputs 完全为空,指向 ScriptBlock 中的错误。
如有任何帮助,我们将不胜感激。
希望这个例子可以帮助您更好地理解。您的代码的关键问题是您的脚本块缺少其
process
块,并且脚本块的参数应该是 psobject
或只是 object
,因为线程从管道 (TInput
) 接收的只是一个 pscustomobject
,他们没有收到整个PSDataCollection<TInput>
。也很难给出 PowerShell 的示例,因为它是单线程的。
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$ScriptBlock = {
Param(
# all threads will receive this and process it in parallel
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[psobject] $Bias
)
process {
$BiasValue = $Bias.Value
[pscustomobject]@{
ThreadId = [runspace]::DefaultRunspace.Id
Result = (Get-Random -Maximum 100) + $BiasValue
}
Start-Sleep 1
}
}
$jobs = [System.Collections.Generic.List[object]]::new()
#Create the threads
1..10 | ForEach-Object {
# simulate input from pipeline
$Inputs.Add([pscustomobject]@{ Value = $_ })
# now start processing
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
$jobs.Add([PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs, $Outputs)
})
# simulate output, ReadAll() will copy the output into a new
# collection that we can safely read and clear itself
if ($Outputs.Count) {
$Outputs.ReadAll()
}
}
$Inputs.Complete()
# now block until processing is done
do {
$id = [System.Threading.WaitHandle]::WaitAny($jobs.State.AsyncWaitHandle, 200)
# if there is any output from threads, consume it
if ($Outputs.Count) {
$Outputs.ReadAll()
}
if ($id -eq [System.Threading.WaitHandle]::WaitTimeout) {
continue
}
$job = $jobs[$id]
$job.Instance.EndInvoke($job.State)
$job.Instance.Dispose()
$jobs.RemoveAt($id)
}
while ($jobs.Count)
$RunspacePool.Dispose()