FS2作为状态传递资源(或效果)

问题描述 投票:1回答:1

我正在尝试实现一个控制摄像机的应用程序。相机命令表示为CameraAction对象的流:

sealed trait CameraMessage
case object Record(recordId: String) extends CameraMessage
case object Stop extends CameraMessage

...

val s = Stream[F, CameraMessage]

假设我有一个测试流,在20秒后发出“记录”并发出“停止”,再过20秒后又发出另一个“记录”消息,依此类推,输入流是无限的。

然后应用程序消耗“记录”,它应该创建GStreamer管道的实例(即它是一种效果)并“运行”它,在“停止”时,它“停止”管道并关闭它。然后在后续的“记录”中,使用新的GStreamer管道重复该模式。

问题是,我需要在流事件的句柄之间传递一个不纯的可变对象的实例。

FS2文档建议使用块使流有状态,所以我尝试了


def record(gStreamerPipeline: String, fileName: String)
(implicit sync: Sync[F]): F[Pipeline] = 
{ 
... create and open pipeline ... 
}

def stopRecording(pipe: Pipeline)(implicit sync: Sync[F]): F[Unit] = {
 ... stop pipeline, release resources ... 
}

def effectPipe(pipelineDef: String)(implicit L: Logger[F]): 
Pipe[F, CameraMessage, F[Unit]] = {
    type CameraSessionHandle = Pipeline
    type CameraStream = Stream[F, CameraSessionHandle]

    s: Stream[F, CameraMessage] =>
      s.scanChunks(Stream[F, CameraSessionHandle]()) {
        case (s: CameraStream, c: Chunk[CameraMessage]) =>
          c.last match {
            case Some(Record(fileName)) =>
              (Stream.bracket(record(pipelineDef, fileName))(p => stopRecording(p)), Chunk.empty)
            case Some(StopRecording) =>
              (Stream.empty, Chunk(s.compile.drain))
            case _ =>
              (s, Chunk.empty)
          }
      }
  }

此代码的问题是,实际的记录不会在'Record'事件中发生,而是会评估整个块的效果,即,当'StopRecording'消息到达时,会打开相机,然后立即将其再次关闭。

我如何不分块地传递“状态”?还是有其他方法可以达到我需要的结果?

这可能类似于FS2 Stream with StateT[IO, _, _], periodically dumping state但是不同之处在于,在我看来,状态不是纯数据结构而是资源。

scala scala-cats fs2 cats-effect
1个回答
2
投票

我最终能够使用https://typelevel.org/blog/2018/06/07/shared-state-in-fp.html中所述的Mutable Reference模式来解决它>

这里是代码:

import cats.effect._
import cats.syntax.all._
import fs2.Stream

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.language.higherKinds

class FRef[F[_], T](implicit sync: Sync[F]) {
  private var state: T = _
  def set(n: T): F[Unit] = sync.delay(this.state = n)
  def get: F[T] = sync.pure(state)
}

object FRef {
  def apply[F[_], T](implicit sync: Sync[F]): F[FRef[F, T]] = sync.delay { new FRef() }
}

class CameraImpl(id: String) extends Camera {

  override def record(): Unit = {
    println(s"Recording $id")
  }

  override def stop(): Unit = {
    println(s"Stopping $id")
  }

  override def free(): Unit = {
    Thread.sleep(500)
    println(s"Freeing $id")
  }
}

object Camera {
  def apply(id: String) = new CameraImpl(id)
}

trait Camera {
  def record(): Unit
  def stop(): Unit
  def free(): Unit
}

sealed trait CameraMessage
case class Record(recordId: String) extends CameraMessage
case object StopRecording extends CameraMessage

class Streamer[F[_]](implicit sync: Sync[F]) {

  def record(id: String): F[Camera] = {
    sync.delay {
      val r = Camera(id)
      r.record()
      r
    }
  }

  def stopRecording(pipe: Camera): F[Unit] = {
    sync.delay {
      pipe.stop()
      pipe.free()
    }
  }

  def effectPipe(state: FRef[F, Option[Camera]])(
      implicit sync: Sync[F]): Stream[F, CameraMessage] => Stream[F, Unit] = {
    type CameraStream = Stream[F, Camera]

    s: Stream[F, CameraMessage] =>
      s.evalMap {
        case Record(fileName) =>
          for {
            r <- record(fileName)
            _ <- state.set(Some(r))
          } yield ()
        case StopRecording =>
          for {
            s <- state.get
            _ <- stopRecording(s.get)
            _ <- state.set(None)
          } yield ()
      }
  }
}

object FS2Problem extends IOApp {
  import scala.concurrent.duration._

  override def run(args: List[String]): IO[ExitCode] = {

    implicit val ec: ExecutionContextExecutor = ExecutionContext.global

    val streamer = new Streamer[IO]

    val s = Stream.awakeEvery[IO](5.seconds).take(10).zipWithIndex.map {
      case (_, idx) =>
        idx % 2 match {
          case 0 =>
            Record(s"Record $idx")
          case _ =>
            StopRecording
        }
    }

    val ss = for {
      streamerState <- Stream.eval(FRef[IO, Option[Camera]])
      s <- s.through(streamer.effectPipe(streamerState))
    } yield ()

    ss.compile.drain.map(_ => ExitCode.Success)
  }
}

© www.soinside.com 2019 - 2024. All rights reserved.