我用 Swift Concurrency 实现了一个作业调度程序。这些工作只是关闭。该调度程序并行处理一定数量的作业并要求其他作业等待。它使用一个参与者来封装所有可变数据。
我设法让它发挥作用。但我觉得非常麻烦。我怎样才能让它变得更好?我可以用不同的方式实现它吗?所有建议都受到热烈欢迎。
class CustomJob {
var completion: () -> ()
init(completion: @escaping () -> Void) {
self.completion = completion
}
}
actor JobQueue {
private var maxRunningCount: Int
private var runningJobCount = 0
private var pendingJobs = [CustomJob]()
init(maxRunningCount: Int) {
self.maxRunningCount = maxRunningCount
}
func addJob(job: CustomJob) {
pendingJobs.append(job)
}
// I found that I need to increment the runningJobCount here.
func nextJob() -> CustomJob? {
if runningJobCount == maxRunningCount {
print("The next job needs to wait")
return nil
}
if runningJobCount < maxRunningCount && pendingJobs.count > 0 {
runningJobCount += 1
return pendingJobs.removeFirst()
} else {
return nil
}
}
func finishOneJob() {
runningJobCount -= 1
}
}
class JobScheduler {
let jobQueue: JobQueue
init(maxRunningCount: Int) {
jobQueue = JobQueue(maxRunningCount: maxRunningCount)
}
func scheduleJob(job: @escaping () -> ()) {
Task {
await jobQueue.addJob(job: CustomJob(completion: job))
run()
}
}
private func run() {
Task {
if let job = await jobQueue.nextJob() {
Task {
await self.executeJob(job: job)
await self.jobQueue.finishOneJob()
run()
}
}
}
}
private func executeJob(job: CustomJob) async {
return await withCheckedContinuation { continuation in
job.completion()
continuation.resume()
}
}
}
我用了派遣组安排了5个工作并进行了测试。
// MARK: - TEST
let processor = JobScheduler(maxRunningCount: 2)
let group = DispatchGroup()
for job in 1...5 {
group.enter()
print("Job \(job) scheduled")
processor.scheduleJob {
print("Job \(job) starts")
sleep(2)
print("Job \(job) complete")
group.leave()
}
}
group.wait()
print("Done")
我将首先说明如何在异步工作的 Swift 并发中执行此操作,然后解释为什么您可能不希望通过同步工作执行此操作。不幸的现实是,正确的解决方案会根据正在启动的工作类型而有所不同。您可能不需要一种一刀切的通用调度程序,但要真正考虑您的应用程序的特定需求,并为您的特定用例选择合适的模式。
那么,首先,我们如何在异步任务的 Swift 并发中做到这一点?
限制 Swift 并发中的并行性。
为了限制 Swift 并发中的并发,我们将使用任务组,但对于高于特定阈值的每次迭代调用
next
:
await withTaskGroup(of: Void.self) { group in
var index = 0
for await work in sequence {
index += 1
if index > 4 { await group.next() }
group.addTask { await work() }
}
}
参见 如何使用 Swift Concurrency 约束并发(如
maxConcurrentOperationCount
)?
事后我们如何向
AsyncSequence
添加工作?
向
AsyncSequence
添加工作的一种简单方法是使用 Swift Async Algorithms包的
AsyncChannel
。例如:
actor Scheduler {
typealias Work = @Sendable () async -> Void
private let channel = AsyncChannel<Work>()
func start() async {
await withTaskGroup(of: Void.self) { group in
var index = 0
for await work in channel {
index += 1
if index > 4 { await group.next() }
group.addTask { await work() }
}
}
}
func addWork(_ work: @escaping Work) async {
await channel.send(work)
}
}
然后我们可以做这样的事情:
struct ContentView: View {
let scheduler = Scheduler()
var body: some View {
VStack {
Button("Channel") {
Task {
await scheduler.addWork {
…
}
}
}
}
.task {
await scheduler.start()
}
}
}
因此,当视图启动时就会启动调度程序,并在每次点击按钮时添加工作。例如,当我在 Instruments 中运行该程序时,每次点击按钮时我都会添加一个路标 ⓢ,并在工作运行时添加一个“间隔”。然后我运行该应用程序,连续点击按钮十次,您可以看到它一次只运行四个任务:
概述了如何在 Swift 并发中使用异步工作来完成此操作,我们应该注意到您的示例是用于处理同步任务的。但我们必须避免在 Swift 并发中调用缓慢的同步函数。 Swift 并发的协作线程池每个 CPU 核心只有一个线程。整个并发系统基于一个契约,永远不会阻碍任何给定线程的前进。
yield
,从而防止 Swift 并发系统陷入死锁。但如果你不能定期 yield
,Apple 明确建议将其移出 swift 并发系统,并建议使用 GCD。 (通常我们建议避免将 GCD 与 Swift 并发混合使用,但这是规则的例外。)请参阅使用 Swift 中的新并发将同步函数转换为异步中的第 3 点。
无论如何,如果您确实想限制缓慢的同步任务的并发性,简单的遗留解决方案是操作队列:
let queue = OperationQueue()
queue.maxConcurrentOperationCount = 4
并且:
queue.addOperation { … }
这是同步工作的约束并行性的简单解决方案,您可以在其中以即时方式启动工作。这会产生完全相同的行为:
concurrentPerform
,但这仅在预先启动固定数量的同步工作项时使用。或者在组合中,我们可以使用flatMap(maxPublishers:transform:)
。有关一些遗留选项的调查,请参阅如何在使用 DispatchSemaphores 时避免线程爆炸?