假设我们有一个 Mac 应用程序,它使用
Process
(以前称为 NSTask
)来运行其他一些可执行文件,该可执行文件将数据写入 STDOUT 和 STDERR。此过程可能需要很长时间才能完成其工作。在 Swift Concurrency 之前,我们会这样做:
// Assume the object with this function is bound to the MainActor:
func doSomeWork()
{
DispatchQueue.global(qos: .default).async
{
let task = Process()
let errorPipe = Pipe()
let errorFileHandle = errorPipe.fileHandleForReading
task.standardError = errorPipe
let outputPipe = Pipe()
let outputFileHandle = outputPipe.fileHandleForReading
task.standardOutput = outputPipe
// Need to read fileHandles because if more than 64kb of data is written, they'll block the whole thread until we read them.
var outputData = Data(capacity: 1000)
var errorData = Data(capacity: 1000)
// Semaphore to make sure our readability handlers exit before we continue. NSTask does not guarantee they'll finish before -waitUntilExit fires.
let sema = DispatchSemaphore(value: 0)
outputFileHandle.readabilityHandler = { handle in
let newData = handle.availableData()
if newData.count == 0 {
// end-of-data signal is an empty data object.
outputFileHandle.readabilityHandler = nil
sema.signal()
} else {
outputData.append(newData)
}
}
errorFileHandle.readabilityHandler = { handle in
let newData = handle.availableData()
if newData.count == 0 {
// end-of-data signal is an empty data object.
errorFileHandle.readabilityHandler = nil
sema.signal()
} else {
errorData.append(newData)
}
}
task.terminationHandler = { task in
sema.signal()
}
defer {
errorFileHandle.closeFile()
outputFileHandle.closeFile()
}
do {
try task.run()
} catch {
// handle error
}
sema.wait()
sema.wait()
sema.wait()
// Use outputData and errorData to do whatever...
}
}
请注意,
readabilityHandlers
不保证它们在哪个线程上执行。并且 Process
不保证每个 readabilityHandler
在调用 task.terminationHandler
时已完成写入数据。 Process 也不保证 readabilityHandlers 在 waitUntilExit()
返回时完成。 不要建议这种替代方案;我去过那里并且我有T恤。
因此,我使用信号量来确保拼图的所有三块都 100% 完成,然后再继续。
outputData
和 errorData
是跨线程共享的可变状态,但由于信号量,可以保证在两个 readabilityHandlers
的所有写入完成之前不会从任何一个对象读取任何内容。
我知道我可以使用
Actor
来包装 outputData
和 errorData
来满足编译器关于 Mutatation of captured var
outputData in concurrently-executing code.
的错误,并形式化我当前的“手动”防范竞争条件(以牺牲更多的上下文切换开销。)
在 Swift 并发中,线程总是应该向前推进。这意味着没有信号量。因此,即使我将
Process
和 readabilityHandlers
移至 Actor
,我如何正确“等待”所有三部分都完成?
withCheckedContinuation
或withCheckedThrowingContinuation
)。然后,您可以同时启动它们,然后await
这些任务。正如您提到的,我们会谨慎地避免使用信号量。
在这种情况下,我可能会为
AsyncSequence
制作一个availableData
:
extension Pipe {
struct AsyncAvailableData: AsyncSequence {
typealias Element = Data
let pipe: Pipe
func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
AsyncStream { continuation in
pipe.fileHandleForReading.readabilityHandler = { @Sendable handle in
let data = handle.availableData
guard !data.isEmpty else {
continuation.finish()
return
}
continuation.yield(data)
}
continuation.onTermination = { _ in
pipe.fileHandleForReading.readabilityHandler = nil
}
}.makeAsyncIterator()
}
}
var availableData: AsyncAvailableData { AsyncAvailableData(pipe: self) }
}
然后我会使用
async let
(SE-0317) 同时运行这些:
func doSomeWork() async throws {
let process = Process()
process.executableURL = command
let inputPipe = Pipe()
let outputPipe = Pipe()
let errorPipe = Pipe()
process.standardError = errorPipe
process.standardOutput = outputPipe
process.standardInput = inputPipe
// optionally, you might want to return whatever non-zero termination status code the process returned
process.terminationHandler = { process in
if process.terminationStatus != 0 {
exit(process.terminationStatus)
}
}
async let outputTask = outputPipe.availableData.joined()
async let errorTask = errorPipe.availableData.joined()
try process.run()
let outputData = await outputTask
let errorData = await errorTask
…
}
地点:
extension Pipe.AsyncAvailableData {
func joined() async -> Data {
var data = Data()
for await availableData in self {
data.append(availableData)
}
return data
}
}
注意,我创建了一个
Data
的异步序列,表示提供给 availableData
的 readabilityHandler
。通常,我会创建各个数据字节的 AsyncSequence
,如 https://stackoverflow.com/a/76941591/1271826 所示。这样,如果您想逐行处理它们,您可以免费获得该行为。但是,如果您只是一次性将整个流加载到内存中,则无需添加将 Data
序列转换为字节流,然后再转换回一个大 Data
的开销。
虽然我希望我回答了您上面的问题,但为了未来的读者,我必须注意,尝试将所有
standardOutput
和 standardError
加载到各自的 Data
中存在严重的缺点。具体来说,您必须同时将完整输出加载到内存中。
我们通常希望在数据传入时对其进行处理,而不是尝试一次将整个数据保存在内存中。在琐碎的用例中,将整个内容加载到 RAM 中效果很好,但随着输入的增加,它变得越来越不实用。买者自负。