使用 Swift Concurrency 实现的 JobScheduler 需要改进

问题描述 投票:0回答:1

我用 Swift Concurrency 实现了一个作业调度程序。这些工作只是关闭。该调度程序并行处理一定数量的作业并要求其他作业等待。它使用一个参与者来封装所有可变数据。

我设法让它发挥作用。但我觉得非常麻烦。我怎样才能让它变得更好?我可以用不同的方式实现它吗?所有建议都受到热烈欢迎。

class CustomJob {
    var completion: () -> ()
    init(completion: @escaping () -> Void) {
        self.completion = completion
    }
}

actor JobQueue {
    private var maxRunningCount: Int
    private var runningJobCount = 0
    private var pendingJobs = [CustomJob]()
    
    init(maxRunningCount: Int) {
        self.maxRunningCount = maxRunningCount
    }
    
    func addJob(job: CustomJob) {
        pendingJobs.append(job)
    }
    
    // I found that I need to increment the runningJobCount here.
    func nextJob() -> CustomJob? {
        if runningJobCount == maxRunningCount {
            print("The next job needs to wait")
            return nil
        }
        if runningJobCount < maxRunningCount && pendingJobs.count > 0 {
            runningJobCount += 1
            return pendingJobs.removeFirst()
        } else {
            return nil
        }
    }
    
    
    func finishOneJob() {
        runningJobCount -= 1
    }
}

class JobScheduler {
    
    let jobQueue: JobQueue
    
    init(maxRunningCount: Int) {
        jobQueue = JobQueue(maxRunningCount: maxRunningCount)
    }
    
    func scheduleJob(job: @escaping () -> ()) {
        Task {
            await jobQueue.addJob(job: CustomJob(completion: job))
            run()
        }
    }
    
    private func run() {
        Task {
            if let job = await jobQueue.nextJob() {
                Task {
                    await self.executeJob(job: job)
                    await self.jobQueue.finishOneJob()
                    run()
                }
            }
        }
    }
    
    private func executeJob(job: CustomJob) async {
        return await withCheckedContinuation { continuation in
            job.completion()
            continuation.resume()
        }
    }
}

我用了派遣组安排了5个工作并进行了测试。

// MARK: - TEST

let processor = JobScheduler(maxRunningCount: 2)
let group = DispatchGroup()

for job in 1...5 {
    group.enter()
    print("Job \(job) scheduled")
    processor.scheduleJob {
        print("Job \(job) starts")
        sleep(2)
        print("Job \(job) complete")
        group.leave()
    }
}

group.wait()
print("Done")

swift swift-concurrency
1个回答
0
投票

我将首先说明如何在异步工作的 Swift 并发中执行此操作,然后解释为什么您可能希望通过同步工作执行此操作。不幸的现实是,正确的解决方案会根据正在启动的工作类型而有所不同。您可能不需要一种一刀切的通用调度程序,但要真正考虑您的应用程序的特定需求,并为您的特定用例选择合适的模式。


那么,首先,我们如何在异步任务的 Swift 并发中做到这一点?

  1. 限制 Swift 并发中的并行性。

    为了限制 Swift 并发中的并发,我们将使用任务组,但对于高于特定阈值的每次迭代调用

    next

    await withTaskGroup(of: Void.self) { group in
        var index = 0
        for await work in sequence {
            index += 1
            if index > 4 { await group.next() }
    
            group.addTask { await work() }
        }
    }
    

    参见 如何使用 Swift Concurrency 约束并发(如

    maxConcurrentOperationCount
    )?

  2. 事后我们如何向

    AsyncSequence
    添加工作?

    AsyncSequence
    添加工作的一种简单方法是使用 Swift Async Algorithms
     包的 
    AsyncChannel。例如:

    actor Scheduler {
        typealias Work = @Sendable () async -> Void
    
        private let channel = AsyncChannel<Work>()
    
        func start() async {
            await withTaskGroup(of: Void.self) { group in
                var index = 0
                for await work in channel {
                    index += 1
                    if index > 4 { await group.next() }
    
                    group.addTask { await work() }
                }
            }
        }
    
        func addWork(_ work: @escaping Work) async {
            await channel.send(work)
        }
    }
    

    然后我们可以做这样的事情:

    struct ContentView: View {
        let scheduler = Scheduler()
    
        var body: some View {
            VStack {
                Button("Channel") {
                    Task {
                        await scheduler.addWork {
                            …
                        }
                    }
                }
            }
            .task {
                await scheduler.start()
            }
        }
    }
    

    因此,当视图启动时就会启动调度程序,并在每次点击按钮时添加工作。例如,当我在 Instruments 中运行该程序时,每次点击按钮时我都会添加一个路标 ⓢ,并在工作运行时添加一个“间隔”。然后我运行该应用程序,连续点击按钮十次,您可以看到它一次只运行四个任务:

    enter image description here


概述了如何在 Swift 并发中使用异步工作来完成此操作,我们应该注意到您的示例是用于处理同步任务的。但我们必须避免在 Swift 并发中调用缓慢的同步函数。 Swift 并发的协作线程池每个 CPU 核心只有一个线程。整个并发系统基于一个契约,永远不会阻碍任何给定线程的前进。

有两种可能的解决方案。如果这是一些计算任务,您可以定期

yield
,从而防止 Swift 并发系统陷入死锁。但如果你不能定期
yield
,Apple 明确建议将其移出 swift 并发系统,并建议使用 GCD。 (通常我们建议避免将 GCD 与 Swift 并发混合使用,但这是规则的例外。)请参阅使用 Swift 中的新并发将同步函数转换为异步中的第 3 点。


无论如何,如果您确实想限制缓慢的同步任务的并发性,简单的遗留解决方案是操作队列

let queue = OperationQueue()
queue.maxConcurrentOperationCount = 4

并且:

queue.addOperation { … }

这是同步工作的约束并行性的简单解决方案,您可以在其中以即时方式启动工作。这会产生完全相同的行为:

enter image description here

另一种传统方法是 GCD 的

concurrentPerform
,但这仅在预先启动固定数量的同步工作项时使用。或者在组合中,我们可以使用
flatMap(maxPublishers:transform:)
。有关一些遗留选项的调查,请参阅如何在使用 DispatchSemaphores 时避免线程爆炸?

© www.soinside.com 2019 - 2024. All rights reserved.