顺序执行异步函数的多个调用

问题描述 投票:0回答:1

假设我有一个异步函数:

func doAsyncWork() async throws { ... }

该函数可以从多个地方调用。我想要实现的是确保该函数永远不会同时执行多次。例如,如果有人在第一个调用者仍在执行该函数时调用该函数,则它应该将第二个调用排队,直到第一个调用完成(依此类推,并进行适当的错误传播)。必须保留调用的顺序,因此可能需要诸如 FIFO 队列之类的东西来存储尚未处理的调用。

我想避免仅出于此目的使用 RxSwift 等外部库/框架,因此我正在寻找一个普通的 Swift 解决方案。有什么想法吗?

swift asynchronous
1个回答
1
投票

这是我的非人工智能“Q&D”解决方案。 ;)

编辑

此版本修复了问题(请参阅下面的说明)。

在操作方面的更多多功能性方面肯定可以改进,即输入参数和抛出。


func makePublisher<Output>(
    operation: @escaping () async -> Output
) -> some Publisher<Output, Never> {
    Future { promise in
        Task {
            let output = await operation()
            promise(.success(output))
        }
    }
}


actor TaskQueue<Value> {
    
    private struct Effect {
        private let f: () -> AnyPublisher<Value, Never>
        let continuation: CheckedContinuation<Value, Never>
        
        init(
            operation: @escaping () async -> Value,
            continuation: CheckedContinuation<Value, Never>
        ) {
            self.f = { makePublisher(operation: operation).eraseToAnyPublisher() }
            self.continuation = continuation
        }
        
        func invoke() -> AnyPublisher<Value, Never> {
            f()
        }
    }
    
    typealias Operation = () async -> Value
    typealias Continuation = CheckedContinuation<Value, Never>
    
    private var cancellable: AnyCancellable!
    private var input = PassthroughSubject<Effect, Never>()
        
    init(maxTasks: Int = 1, bufferSize: Int = 100) {
        self.cancellable = input
        .buffer(size: bufferSize, prefetch: .keepFull, whenFull: .customError({ fatalError("queue full") }))
        .flatMap(maxPublishers: .max(maxTasks)) { effect in
            effect.invoke().map { [continuation = effect.continuation] in ($0, continuation) }
        }
        .sink { value, continuation in
            continuation.resume(returning: value)
        }
    }
    
    func enqueue(_ operation: @escaping Operation) async -> Value {
        await withCheckedContinuation { continuation in
            self.send(operation: operation, continuation: continuation)
        }
    }
    
    private func send(operation: @escaping Operation, continuation: Continuation) {
        self.input.send(
            Effect(
                operation: operation,
                continuation: continuation
            )
        )
    }
}

测试:

    func testTaskQueue() async throws {
        let taskQueue = TaskQueue<Void>()
                
        // Call the function from multiple places

        try await withThrowingDiscardingTaskGroup { group in
            group.addTask {
                print(Date(), "enqueue")
                await taskQueue.enqueue {
                    print(Date(), "0 executing")
                    try! await Task.sleep(for: .milliseconds(1000))
                    print(Date(), "0 finished")
                }
                print(Date(), "0 done")
            }
            group.addTask {
                print(Date(), "enqueue")
                await taskQueue.enqueue {
                    print(Date(), "1 executing")
                    try! await Task.sleep(for: .milliseconds(1000))
                    print(Date(), "1 finished")
                }
                print(Date(), "1 done")
            }
            group.addTask {
                print(Date(), "enqueue")
                await taskQueue.enqueue {
                    print(Date(), "2 executing")
                    try! await Task.sleep(for: .milliseconds(1000))
                    print(Date(), "2 finished")
                }
                print(Date(), "2 done")
            }
            group.addTask {
                print(Date(), "enqueue")
                await taskQueue.enqueue {
                    print(Date(), "3 executing")
                    try! await Task.sleep(for: .milliseconds(1000))
                    print(Date(), "3 finished")
                }
                print(Date(), "3 done")
            }
        }
        
        print("done")
    }

输出

2024-03-08 13:06:23 +0000 enqueue
2024-03-08 13:06:23 +0000 enqueue
2024-03-08 13:06:23 +0000 enqueue
2024-03-08 13:06:23 +0000 enqueue
2024-03-08 13:06:23 +0000 0 executing
2024-03-08 13:06:24 +0000 0 finished
2024-03-08 13:06:24 +0000 0 done
2024-03-08 13:06:24 +0000 1 executing
2024-03-08 13:06:26 +0000 1 finished
2024-03-08 13:06:26 +0000 1 done
2024-03-08 13:06:26 +0000 2 executing
2024-03-08 13:06:27 +0000 2 finished
2024-03-08 13:06:27 +0000 2 done
2024-03-08 13:06:27 +0000 3 executing
2024-03-08 13:06:28 +0000 3 finished
2024-03-08 13:06:28 +0000 3 done
done

更新前bug说明

由于

flatMap
受到“最大操作”的限制,我们需要提供一个缓冲区来保存传入的操作。 PassthroughSubject 不会为我们做这件事,当一个新的操作被排队时,挂起的操作将被简单地“覆盖”并在该操作执行之前被删除。这导致了错误“SWIFT TASK CONTINUATION MISUSE”。

© www.soinside.com 2019 - 2024. All rights reserved.