为了将对公共操作的访问限制为有限数量的线程,我们可以使用 Foundation 的 DispatchSemaphore。 例如:
func someFunction(operation: @escaping () -> Void) {
DispatchQueue.global().async {
self.semaphore.wait()
operation()
self.semaphore.signal()
}
}
let semaphore = DispatchSemaphore(value: 5)
someFunction(1) {
print("Exexuting add operation")
}
someFunction(2) {
print("Exexuting add operation")
}
.
.
.
someFunction(n) //64 times
这里的问题是信号量允许5个并发操作,但是现在其他线程也被调度,并且被调度的线程必须等待,直到前面5个线程中的一些线程完成他们的任务。这将导致线程利用率的浪费。另外,DispatchQueue 的线程数上限为 64,因此系统会耗尽线程,这也称为线程爆炸。
我们如何避免这里的线程爆炸并仅分派适量的线程来执行而不被阻塞?
一些观察:
为了避免使用信号量模式时的线程爆炸问题,您需要
wait
before 分派到并发队列。因此,我将创建一个“调度程序”队列来管理所有这些添加到某些“处理器”队列中的作业:
let schedulerQueue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".scheduler.queue")
let processorQueue = DispatchQueue(label: Bundle.main.bundleIdentifier! + ".process.queue", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 5)
func start(block: @escaping () -> Void) {
schedulerQueue.async {
semaphore.wait()
processorQueue.async {
block()
semaphore.signal()
}
}
}
如果由
block
发起的工作本身是异步的(例如网络请求),那么您只需将 semaphore.signal()
移动到该异步工作项的完成处理程序中即可。
虽然上面回答了信号量问题,但我根本不建议使用信号量。考虑使用
OperationQueue
,它有一个控制并发程度的机制,即maxConcurrentOperationCount
:
let processorQueue = OperationQueue()
processorQueue.name = Bundle.main.bundleIdentifier! + ".process.queue"
processorQueue.maxConcurrentOperationCount = 5
func start(block: @escaping () -> Void) {
processorQueue.addOperation {
block()
}
}
并且,如果与此操作相关的工作本身是异步的,那么您将实现一个自定义
Operation
子类,它允许您管理异步任务之间的依赖关系(例如在 尝试理解异步操作子类中所设想的) )。
解决此问题的另一种方法是使用合并的
maxPublishers
模式,如组合框架序列化异步操作中所述。
另一种方式是 Swift 并发。例如,您可能有一个
AsyncChannel
来管理传入的任务队列,以及一个任务组来约束并发程度(如 使 Swift 并发中的任务串行运行 的后半部分所述)。