假设我有一个异步函数:
func doAsyncWork() async throws { ... }
该函数可以从多个地方调用。我想要实现的是确保该函数永远不会同时执行多次。例如,如果有人在第一个调用者仍在执行该函数时调用该函数,则它应该将第二个调用排队,直到第一个调用完成(依此类推,并进行适当的错误传播)。必须保留调用的顺序,因此可能需要诸如 FIFO 队列之类的东西来存储尚未处理的调用。
我想避免仅出于此目的使用 RxSwift 等外部库/框架,因此我正在寻找一个普通的 Swift 解决方案。有什么想法吗?
这是我的非人工智能“Q&D”解决方案。 ;)
编辑
此版本修复了问题(请参阅下面的说明)。
在操作方面的更多多功能性方面肯定可以改进,即输入参数和抛出。
func makePublisher<Output>(
operation: @escaping () async -> Output
) -> some Publisher<Output, Never> {
Future { promise in
Task {
let output = await operation()
promise(.success(output))
}
}
}
actor TaskQueue<Value> {
private struct Effect {
private let f: () -> AnyPublisher<Value, Never>
let continuation: CheckedContinuation<Value, Never>
init(
operation: @escaping () async -> Value,
continuation: CheckedContinuation<Value, Never>
) {
self.f = { makePublisher(operation: operation).eraseToAnyPublisher() }
self.continuation = continuation
}
func invoke() -> AnyPublisher<Value, Never> {
f()
}
}
typealias Operation = () async -> Value
typealias Continuation = CheckedContinuation<Value, Never>
private var cancellable: AnyCancellable!
private var input = PassthroughSubject<Effect, Never>()
init(maxTasks: Int = 1, bufferSize: Int = 100) {
self.cancellable = input
.buffer(size: bufferSize, prefetch: .keepFull, whenFull: .customError({ fatalError("queue full") }))
.flatMap(maxPublishers: .max(maxTasks)) { effect in
effect.invoke().map { [continuation = effect.continuation] in ($0, continuation) }
}
.sink { value, continuation in
continuation.resume(returning: value)
}
}
func enqueue(_ operation: @escaping Operation) async -> Value {
await withCheckedContinuation { continuation in
self.send(operation: operation, continuation: continuation)
}
}
private func send(operation: @escaping Operation, continuation: Continuation) {
self.input.send(
Effect(
operation: operation,
continuation: continuation
)
)
}
}
测试:
func testTaskQueue() async throws {
let taskQueue = TaskQueue<Void>()
// Call the function from multiple places
try await withThrowingDiscardingTaskGroup { group in
group.addTask {
print(Date(), "enqueue")
await taskQueue.enqueue {
print(Date(), "0 executing")
try! await Task.sleep(for: .milliseconds(1000))
print(Date(), "0 finished")
}
print(Date(), "0 done")
}
group.addTask {
print(Date(), "enqueue")
await taskQueue.enqueue {
print(Date(), "1 executing")
try! await Task.sleep(for: .milliseconds(1000))
print(Date(), "1 finished")
}
print(Date(), "1 done")
}
group.addTask {
print(Date(), "enqueue")
await taskQueue.enqueue {
print(Date(), "2 executing")
try! await Task.sleep(for: .milliseconds(1000))
print(Date(), "2 finished")
}
print(Date(), "2 done")
}
group.addTask {
print(Date(), "enqueue")
await taskQueue.enqueue {
print(Date(), "3 executing")
try! await Task.sleep(for: .milliseconds(1000))
print(Date(), "3 finished")
}
print(Date(), "3 done")
}
}
print("done")
}
输出
2024-03-08 13:06:23 +0000 enqueue
2024-03-08 13:06:23 +0000 enqueue
2024-03-08 13:06:23 +0000 enqueue
2024-03-08 13:06:23 +0000 enqueue
2024-03-08 13:06:23 +0000 0 executing
2024-03-08 13:06:24 +0000 0 finished
2024-03-08 13:06:24 +0000 0 done
2024-03-08 13:06:24 +0000 1 executing
2024-03-08 13:06:26 +0000 1 finished
2024-03-08 13:06:26 +0000 1 done
2024-03-08 13:06:26 +0000 2 executing
2024-03-08 13:06:27 +0000 2 finished
2024-03-08 13:06:27 +0000 2 done
2024-03-08 13:06:27 +0000 3 executing
2024-03-08 13:06:28 +0000 3 finished
2024-03-08 13:06:28 +0000 3 done
done
更新前bug说明
由于
flatMap
受到“最大操作”的限制,我们需要提供一个缓冲区来保存传入的操作。 PassthroughSubject 不会为我们做这件事,当一个新的操作被排队时,挂起的操作将被简单地“覆盖”并在该操作执行之前被删除。这导致了错误“SWIFT TASK CONTINUATION MISUSE”。