[我试图了解猫的效果Cancelable
的工作原理。我有以下基于documentation
import java.util.concurrent.{Executors, ScheduledExecutorService}
import cats.effect._
import cats.implicits._
import scala.concurrent.duration._
object Main extends IOApp {
def delayedTick(d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[Unit] = {
IO.cancelable { cb =>
val r = new Runnable {
def run() =
cb(Right(()))
}
val f = sc.schedule(r, d.length, d.unit)
// Returning the cancellation token needed to cancel
// the scheduling and release resources early
val mayInterruptIfRunning = false
IO(f.cancel(mayInterruptIfRunning)).void
}
}
override def run(args: List[String]): IO[ExitCode] = {
val scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor()
for {
x <- delayedTick(1.second)(scheduledExecutorService)
_ <- IO(println(s"$x"))
} yield ExitCode.Success
}
}
当我运行此命令时:
❯ sbt run
[info] Loading global plugins from /Users/ethan/.sbt/1.0/plugins
[info] Loading settings for project stackoverflow-build from plugins.sbt ...
[info] Loading project definition from /Users/ethan/IdeaProjects/stackoverflow/project
[info] Loading settings for project stackoverflow from build.sbt ...
[info] Set current project to cats-effect-tutorial (in build file:/Users/ethan/IdeaProjects/stackoverflow/)
[info] Compiling 1 Scala source to /Users/ethan/IdeaProjects/stackoverflow/target/scala-2.12/classes ...
[info] running (fork) Main
[info] ()
该程序此时已挂起。我有很多问题:
mayInterruptIfRunning = false
?取消要中断正在运行的任务不是全部吗?ScheduledExecutorService
的推荐方法吗?我没有在文档中看到示例。()
(然后意外挂起)。如果我想退还其他东西怎么办?例如,假设我想返回一个字符串,这是一些长时间运行的计算的结果。如何从IO.cancelable
中提取该值?似乎困难在于IO.cancelable
返回取消操作,而不是要取消的过程的返回值。请原谅,但这是我的build.sbt
:
name := "cats-effect-tutorial"
version := "1.0"
fork := true
scalaVersion := "2.12.8"
libraryDependencies += "org.typelevel" %% "cats-effect" % "1.3.0" withSources() withJavadoc()
scalacOptions ++= Seq(
"-feature",
"-deprecation",
"-unchecked",
"-language:postfixOps",
"-language:higherKinds",
"-Ypartial-unification")
我仍然可以找到这些问题的答案,尽管有些事情我还是不理解。
为什么程序挂起而不是在1秒后终止?
由于某种原因,Executors.newSingleThreadScheduledExecutor()
导致事物挂起。要解决此问题,我必须使用Executors.newSingleThreadScheduledExecutor(new Thread(_))
。似乎唯一的区别是,第一个版本等效于Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory())
,尽管docs中没有任何内容清楚说明了为什么是这种情况。
我们为什么设置
mayInterruptIfRunning = false
?取消要中断正在运行的任务不是全部吗?
我不得不承认我不完全理解这一点。同样,the docs在这一点上并没有特别明确。至少在true
中断的情况下,将标志切换为Ctrl-c
似乎根本不会改变其行为。
这是定义ScheduledExecutorService的推荐方法吗?我没有在文档中看到示例。
显然不是。我提出的方法是从cats effect源代码中的this snippet大致上受到启发的。
此程序等待1秒钟,然后返回
()
(然后意外挂起)。如果我想退还其他东西怎么办?例如,假设我想返回一个字符串,这是一些长时间运行的计算的结果。如何从IO.cancelable
中提取该值?似乎困难在于IO.cancelable
返回取消操作,而不是要取消的过程的返回值。
IO.cancellable { ... }
块返回IO[A]
,并且回调cb
函数的类型为Either[Throwable, A] => Unit
。从逻辑上讲,这表明IO.cancellable表达式将返回的内容(包装在cb
中)将被馈送到IO
函数中。因此,要返回字符串"hello"
而不是()
,我们重写delayedTick
:
def delayedTick(d: FiniteDuration)
(implicit sc: ScheduledExecutorService): IO[String] = { // Note IO[String] instead of IO[Unit]
implicit val processRunner: JVMProcessRunner[IO] = new JVMProcessRunner
IO.cancelable[String] { cb => // Note IO.cancelable[String] instead of IO[Unit]
val r = new Runnable {
def run() =
cb(Right("hello")) // Note "hello" instead of ()
}
val f: ScheduledFuture[_] = sc.schedule(r, d.length, d.unit)
IO(f.cancel(true))
}
}