现代 Swift 并发中的 NSTask/Process + NSPipe + NSFileHandle

问题描述 投票:0回答:1

背景

假设我们有一个 Mac 应用程序,它使用

Process
(以前称为
NSTask
)来运行其他一些可执行文件,该可执行文件将数据写入 STDOUT 和 STDERR。此过程可能需要很长时间才能完成其工作。在 Swift Concurrency 之前,我们会这样做:

// Assume the object with this function is bound to the MainActor:
func doSomeWork()
{
    DispatchQueue.global(qos: .default).async 
    {
        let task = Process()
            
        let errorPipe = Pipe()
        let errorFileHandle = errorPipe.fileHandleForReading
        task.standardError = errorPipe
            
        let outputPipe = Pipe()
        let outputFileHandle = outputPipe.fileHandleForReading
        task.standardOutput = outputPipe
            
        //  Need to read fileHandles because if more than 64kb of data is written, they'll block the whole thread until we read them.
        var outputData = Data(capacity: 1000)
        var errorData = Data(capacity: 1000)

        // Semaphore to make sure our readability handlers exit before we continue. NSTask does not guarantee they'll finish before -waitUntilExit fires.
        let sema = DispatchSemaphore(value: 0)

        outputFileHandle.readabilityHandler = { handle in
            let newData = handle.availableData()
            if newData.count == 0 {
                // end-of-data signal is an empty data object.
                outputFileHandle.readabilityHandler = nil
                sema.signal()
            } else {
                outputData.append(newData)
            }
        }

        errorFileHandle.readabilityHandler = { handle in
            let newData = handle.availableData()
            if newData.count == 0 {
                // end-of-data signal is an empty data object.
                errorFileHandle.readabilityHandler = nil
                sema.signal()
            } else {
                errorData.append(newData)
            }
        }

        task.terminationHandler = { task in
            sema.signal()
        }

        defer {
            errorFileHandle.closeFile()
            outputFileHandle.closeFile()
        }

        do {
            try task.run()
        } catch {
            // handle error
        }

        sema.wait()
        sema.wait()
        sema.wait()

        // Use outputData and errorData to do whatever...
    }

}

比赛条件

请注意,

readabilityHandlers
不保证它们在哪个线程上执行。并且
Process
不保证每个
readabilityHandler
在调用
task.terminationHandler
时已完成写入数据。 Process 也不保证 readabilityHandlers 在
waitUntilExit()
返回时完成。
不要建议这种替代方案;我去过那里并且我有T恤。

因此,我使用信号量来确保拼图的所有三块都 100% 完成,然后再继续。

outputData
errorData
是跨线程共享的可变状态,但由于信号量,可以保证在两个
readabilityHandlers
的所有写入完成之前不会从任何一个对象读取任何内容。

采用 Swift 并发

我知道我可以使用

Actor
来包装
outputData
errorData
来满足编译器关于
Mutatation of captured var 
outputData
 in concurrently-executing code.
的错误,并形式化我当前的“手动”防范竞争条件(以牺牲更多的上下文切换开销。)

问题

在 Swift 并发中,线程总是应该向前推进。这意味着没有信号量。因此,即使我将

Process
readabilityHandlers
移至
Actor
,我如何正确“等待”所有三部分都完成?

swift actor nstask swift-concurrency nsfilehandle
1个回答
0
投票

作为一般规则,如果您想要获取信号量,我们将实现延续(例如,

withCheckedContinuation
withCheckedThrowingContinuation
)。然后,您可以同时启动它们,然后
await
这些任务。正如您提到的,我们会谨慎地避免使用信号量。

在这种情况下,我可能会为

AsyncSequence
制作一个
availableData

extension Pipe {
    struct AsyncAvailableData: AsyncSequence {
        typealias Element = Data

        let pipe: Pipe

        func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
            AsyncStream { continuation in
                pipe.fileHandleForReading.readabilityHandler = { @Sendable handle in
                    let data = handle.availableData

                    guard !data.isEmpty else {
                        continuation.finish()
                        return
                    }

                    continuation.yield(data)
                }

                continuation.onTermination = { _ in
                    pipe.fileHandleForReading.readabilityHandler = nil
                }
            }.makeAsyncIterator()
        }
    }

    var availableData: AsyncAvailableData { AsyncAvailableData(pipe: self) }
}

然后我会使用

async let
(SE-0317) 同时运行这些:

func doSomeWork() async throws {
    let process = Process()
    process.executableURL = command

    let inputPipe  = Pipe()
    let outputPipe = Pipe()
    let errorPipe  = Pipe()

    process.standardError  = errorPipe
    process.standardOutput = outputPipe
    process.standardInput  = inputPipe

    // optionally, you might want to return whatever non-zero termination status code the process returned

    process.terminationHandler = { process in
        if process.terminationStatus != 0 {
            exit(process.terminationStatus)
        }
    }

    async let outputTask = outputPipe.availableData.joined()
    async let errorTask  = errorPipe.availableData.joined()

    try process.run()

    let outputData = await outputTask
    let errorData = await errorTask

    …
}

地点:

extension Pipe.AsyncAvailableData {
    func joined() async -> Data {
        var data = Data()
        for await availableData in self {
            data.append(availableData)
        }
        return data
    }
}

注意,我创建了一个

Data
的异步序列,表示提供给
availableData
readabilityHandler
。通常,我会创建各个数据字节的
AsyncSequence
,如 https://stackoverflow.com/a/76941591/1271826 所示。这样,如果您想逐行处理它们,您可以免费获得该行为。但是,如果您只是一次性将整个流加载到内存中,则无需添加将
Data
序列转换为字节流,然后再转换回一个大
Data
的开销。


虽然我希望我回答了您上面的问题,但为了未来的读者,我必须注意,尝试将所有

standardOutput
standardError
加载到各自的
Data
中存在严重的缺点。具体来说,您必须同时将完整输出加载到内存中。

我们通常希望在数据传入时对其进行处理,而不是尝试一次将整个数据保存在内存中。在琐碎的用例中,将整个内容加载到 RAM 中效果很好,但随着输入的增加,它变得越来越不实用。买者自负。

© www.soinside.com 2019 - 2024. All rights reserved.