我创建了一个微型工作系统来以最大的多核处理器利用率运行并行作业。 它似乎工作正常,但在某些时候,当处理大量作业时,会出现错误(没有错误消息,只是挂起),我怀疑这是低级竞争条件。我无法确定这是否是我用来实现并行性的 cats-effect 的错误,还是 Atomic 或 TrieMap 的错误。
这是一个简化的实现,可用于说明和测试问题:
import cats.effect.IO
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.collection.concurrent.TrieMap
import cats.effect.unsafe.implicits.global
import java.util.concurrent.ConcurrentHashMap
object ThreadingError extends App:
val jobIdsAdded = (0L until 10000L).toList
for (_ <- jobIdsAdded.iterator) {
ParallelJobs.addJob(() => {})
}
while(ParallelJobs.count.get() < 10000L) {
print(s"${ParallelJobs.count.get()}\r")
Thread.sleep(200)
}
object ParallelJobs:
private val allCores = Runtime.getRuntime.availableProcessors()
private val availableCores = allCores - 1
private val assignedTillJobId: AtomicLong = AtomicLong(0L)
val jobsTrieMap: TrieMap[Long, () => Any] = TrieMap.empty[Long, () => Any]
val jobsConcurrentHashMap: ConcurrentHashMap[Long, () => Any] = ConcurrentHashMap[Long, () => Any]()
val locked: AtomicBoolean = AtomicBoolean(false)
val count: AtomicLong = AtomicLong(0L)
workerGroup
.unsafeRunAsync(either => {
if (either.isLeft)
println(either.left.get.getMessage)
either.left.get.printStackTrace()
})
def addJob(jobFn: () => Any): Unit =
val jobId = jobsTrieMap.size
jobsTrieMap(jobId) = jobFn
//val jobId = jobsConcurrentHashMap.size()
//jobsConcurrentHashMap.put(jobId, jobFn)
private def workerGroup: IO[Unit] = (0 until availableCores).map(_ => worker).reduce(_ &> _)
private def worker: IO[Unit] =
IO({
while (true) {
if (!locked.get() && jobsTrieMap.nonEmpty)
//if (!locked.get() && !jobsConcurrentHashMap.isEmpty)
locked.set(true)
val jobId = assignedTillJobId.getAndIncrement()
val toDo = jobsTrieMap(jobId)
//val toDo = jobsConcurrentHashMap.get(jobId)
jobsTrieMap -= jobId
//jobsConcurrentHashMap.remove(jobId)
locked.set(false)
toDo() // long running job
count.incrementAndGet()
else
Thread.sleep(100)
}
})
如你所见,我也尝试了ConcurrentHashMap;使用它只是停止应用程序的运行。 我想出了一些锁定机制来测试问题是否是由多个工作人员尝试编写 TrieMap 引起的,但这也没有帮助。
我使用Scala 3.3和cats-effect 3.5.1
这段代码似乎不知道
IO
类型的概念应该如何工作。
Scala 上所有这些 IO 类型(Future、Cats Effect 的 IO、ZIO、Monix 的 Task...)背后的想法是,您编写某种声明性代码(并且肯定比单独的
Thread
和锁级别更高),然后将其发送给一个有 Scheduler
的跑步者。由于所有代码都应该使用以下内容构建:map
、flatMap
、recover
、traverse
以及其他组合,因此假设 IO 内的每个单独的不间断部分是:
它允许调度程序:
这意味着:
Thread.sleep
- 有指令让光纤休眠而不阻塞线程池中的整个Thread
(并且可能会通过天真地操作sleep
来阻塞整个线程池)while(true)
- 因为它劫持了整个Thread
并且使得无法取消操作(并且wRef
、Semaphore
等)表示中指。这并不意味着绝对不可能将它们与 IO monad 一起使用,只是这些是为了轻松构建并发应用程序而构建的......只要您使用它们,因为它们希望您不使用它们不要打破它们所建立的假设(类似于如果您认为
Thread
固定不是一回事并开始运行您自己的东西,那么您可能会在 Spring Framework + Hibernate 中获得糟糕的体验)。因此,我建议您等到了解运行时在底层如何工作后再进行底层操作。
同时,我建议将整个代码重写为使用内置组合器和操作的代码:
import cats.effect.{IO, IOApp}
import cats.syntax._
object ThreadingError extends IOApp.Simple {
val run: IO[Unit] = (0L until 10000L)
.toList
// will be run in parallel!
.parTraverse { number =>
// sleep 2s then print
IO.sleep(2000) >> IO {
println(i)
}
}
.void // makes result Unit
}
我创建了一个微型工作系统来运行并行作业,并最大限度地提高多核处理器利用率
所有副作用的 monad:
scala.concurrent.Future
、cats.effect.IO
、monix.eval.Task
、zio.ZIO
都有一个默认的 Scheduler
,它使用 Runtime.getRuntime.availableProcessors()
来决定 Thread
池的大小。他们还知道如何处理阻塞(以声明方式定义),以防止所有线程陷入“等待”状态,在这种状态下,可以“通知”它们的作业无法进入线程池。
这意味着您的整个代码可以只用一些
parTraverse
或其他内容替换,以获得相同的结果,但没有竞争条件。
如果你想非常控制并行度的大小,你可以使用:
IO.parSequenceN(listOfIOs)(sizeOfParallelism)
fs2.Stream
.fromIterator[IO]((1 until 100).iterator, chunkSize)
.mapAsync(sizeOfParallelism) { int =>
IO.sleep(2000) >> IO {
println(i)
}
}
长话短说:不要推出自己的低级解决方案(直到你了解库在幕后的作用),那就没问题了。有足够多的助手可以在库的 API 之上推出您自己的解决方案,从而更轻松、更快地交付同样的事情,并且不会出现竞争条件或其他问题。