带有函数式编程的ConcurrentHashMap。暂停unsafeRun安全吗?

问题描述 投票:3回答:1

问题:用unsafeRunSync暂停IO是否安全?例如。

val io: IO[Unit] = //...
val io2: IO[Unit] = IO(io.unsafeRunSync)

我这样做的原因是我有一些用F[_]: Effect参数化的类,就像一个缓存:

import cats.effect.Effect

final class MyChache[F[_]](implicit F: Effect[F]) {
  private val cache = new ConcurrentHashMap[Int, String]

  def getOrCreate(key: Int): F[String] = F delay {
    cache.computeIfAbsent(
      key, 
      k => longRunningEffecfulComputation(k).toIO.unsafeRunSync() // <-- Here
    )
  }
}


object MyCache {
  def longRunningEffecfulComputation[F[_] : Effect](key: Int): F[String] = {
    //...
  }
}

关键是我想为每个键只运行一次这个长时间运行的有效计算(很少见)。但是我想在检索现有密钥时保持非阻塞状态。

ConcurrentHashMap似乎是一个完美的选择,但它需要这个丑陋的技巧,运行和暂停效果。还有更好的方法吗?

scala concurrency functional-programming scala-cats
1个回答
1
投票

这至少可能是不安全的。假设您的长时间运行计算使用固定大小的线程池:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import cats.effect.Async

object MyCache {
  val smallThreadPool = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

  def longRunningEffectfulComputation[F[_] : Effect](key: Int): F[String] = {
    Effect[F].flatMap(Async.shift[F](smallThreadPool))(_ => Effect[F].delay("test"))
  }
}

并且您的缓存在同一个线程池中使用:

val io = for {
  _ <- IO.shift(MyCache.smallThreadPool)
  x <- new MyCache[IO].getOrCreate(1)
} yield x

当你打电话给io.unsafeRunSync()时,你会发现它没有终止。

相反,您可以使用支持cat-effect的缓存api,如ScalaCache

© www.soinside.com 2019 - 2024. All rights reserved.