我编写了简单的callback(handler)函数,该函数传递给异步api,我想等待结果:
object Handlers {
val logger: Logger = Logger("Handlers")
implicit val cs: ContextShift[IO] =
IO.contextShift(ExecutionContext.Implicits.global)
class DefaultHandler[A] {
val response: IO[MVar[IO, A]] = MVar.empty[IO, A]
def onResult(obj: Any): Unit = {
obj match {
case obj: A =>
println(response.flatMap(_.tryPut(obj)).unsafeRunSync())
println(response.flatMap(_.isEmpty).unsafeRunSync())
case _ => logger.error("Wrong expected type")
}
}
def getResponse: A = {
response.flatMap(_.take).unsafeRunSync()
}
}
但是由于某些原因,tryPut和isEmpty(当我手动调用onResult方法时)都返回true,因此,当我调用getResponse时,它将永远休眠。这是我的测试:
class HandlersTest extends FunSuite {
test("DefaultHandler.test") {
val handler = new DefaultHandler[Int]
handler.onResult(3)
val response = handler.getResponse
assert(response != 0)
}
}
有人可以解释为什么tryPut返回true,但是没有放入。在scala中使用Mvar /通道的正确方法是什么?
IO[X]
表示您具有创建某些X的配方。因此,在您的示例中,yuo正在放入一个MVar,然后再询问另一个。
这是我要怎么做。
object Handlers {
trait DefaultHandler[A] {
def onResult(obj: Any): IO[Unit]
def getResponse: IO[A]
}
object DefaultHandler {
def apply[A : ClassTag]: IO[DefaultHandler[A]] =
MVar.empty[IO, A].map { response =>
new DefaultHandler[A] {
override def onResult(obj: Any): IO[Unit] = obj match {
case obj: A =>
for {
r1 <- response.tryPut(obj)
_ <- IO(println(r1))
r2 <- response.isEmpty
_ <- IO(println(r2))
} yield ()
case _ =>
IO(logger.error("Wrong expected type"))
}
override def getResponse: IO[A] =
response.take
}
}
}
}
“不安全”有点暗示,但是每次调用unsafeRunSync
时,您基本上应该将其视为一个全新的宇宙。在拨打电话之前,您只能描述将要发生的情况,而实际上不能进行任何更改。在通话期间,所有更改都会发生。调用完成后,该Universe将被销毁,您可以读取结果,但不再更改任何内容。一个unsafeRunSync
宇宙中发生的事情不会影响另一个。
您需要在测试代码中将其命名为恰好一次。这意味着您的测试代码需要类似于:
val test = for {
handler <- TestHandler.DefaultHandler[Int]
_ <- handler.onResult(3)
response <- handler.getResponse
} yield response
assert test.unsafeRunSync() == 3
请注意,这并不能直接使用MVar
带来很多收益。我认为您正在尝试混合IO
内部和外部的副作用,但这不起作用。所有副作用都必须在内部。