推动元件外部到在fs2的反应性流

问题描述 投票:4回答:2

我有一个外部(即,我不能改变它)的Java API它是这样的:

public interface Sender {
    void send(Event e);
}

我需要实现其接受每个事件,将其转换为一个JSON对象,收集他们的一些号码为单个束,并发送通过HTTP一些端点的Sender。这一切都应该异步进行的,没有send()阻塞调用线程,与一些固定大小的缓冲区和丢弃新事件,如果缓冲区已满。

随着阿卡流,这是很简单的:我创建阶段的图表(使用阿卡-HTTP发送HTTP请求),具体化,并使用物化ActorRef推新事件流:

lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
  .via(CustomBuffer(bufferSize))  // buffer all events
  .groupedWithin(batchSize, flushDuration)  // group events into chunks
  .map(toBundle)  // convert each chunk into a JSON message
  .mapAsyncUnordered(1)(sendHttpRequest)  // send an HTTP request
  .toMat(Sink.foreach { response =>
    // print HTTP response for debugging
  })(Keep.both)

lazy val (eventsActor, completeFuture) = eventPipeline.run()

override def send(e: Event): Unit = {
  eventsActor ! e
}

这里CustomBuffer是一个自定义GraphStage这是非常相似的库提供的Buffer但适合我们的具体需求;它可能并不重要这个特殊的问题。

正如你所看到的,与非流码流的交互是非常简单 - 在!性状ActorRef方法是异步的,不需要任何额外的机械被调用。然后通过反应性整个管道处理的每个被发送到演员事件。此外,因为如何阿卡-HTTP实现,我甚至得到连接池免费的,所以不超过一个打开连接到服务器。

不过,我不能找到一种方法,做同样的事情与FS2正常。即使丢弃缓冲的问题(我可能需要编写一个自定义Pipe实现这确实是我们需要更多的东西)和HTTP连接池,我还是坚持了一个更基本的东西 - 那就是,如何将数据推到反应性流“来自外部的”。

所有教程和文档,我可以找到假设整个程序有一定的影响上下文中发生,通常IO。这不是我的情况 - send()方法是由Java库在未指定的时间调用。因此,我不能把一切都一个IO动作里面,我一定要完成的send()方法里面的“推”的动作,并有反应流作为一个独立的实体,因为我要聚集事件,并希望池HTTP连接(这我相信自然联系在一起的反应流)。

我认为我需要一些额外的数据结构,像Queue。 FS2确实有某种fs2.concurrent.Queue的,但同样的,所有的文件显示了如何使用一个单一IO上下文中,所以我认为做这样的事情

val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()

然后使用queue流定义内,然后分别与进一步send()呼叫unsafeRun方法内:

val eventPipeline = queue.dequeue
  .through(customBuffer(bufferSize))
  .groupWithin(batchSize, flushDuration)
  .map(toBundle)
  .mapAsyncUnordered(1)(sendRequest)
  .evalTap(response => ...)
  .compile
  .drain

eventPipeline.unsafeRunAsync(...)  // or something

override def send(e: Event) {
  queue.enqueue(e).unsafeRunSync()
}

是不是正确的方式,最有可能甚至不工作。

所以,我的问题是,我该如何正确使用FS2解决我的问题?

scala akka-stream reactive-streams fs2
2个回答
1
投票

我没有太多的经验,正是库,但它应该以某种方式那样:

import cats.effect.{ExitCode, IO, IOApp}
import fs2.concurrent.Queue

case class Event(id: Int)

class JavaProducer{

  new Thread(new Runnable {
    override def run(): Unit = {
      var id = 0
      while(true){
        Thread.sleep(1000)
        id += 1
        send(Event(id))
      }
    }
  }).start()

  def send(event: Event): Unit ={
    println(s"Original producer prints $event")
  }
}

class HackedProducer(queue: Queue[IO, Event]) extends JavaProducer {
  override def send(event: Event): Unit = {
    println(s"Hacked producer pushes $event")
    queue.enqueue1(event).unsafeRunSync()
    println(s"Hacked producer pushes $event - Pushed")
  }

}

object Test extends IOApp{
  override def run(args: List[String]): IO[ExitCode] = {
    val x: IO[Unit] = for {
      queue <- Queue.unbounded[IO, Event]
      _ = new HackedProducer(queue)
      done <- queue.dequeue.map(ev => {
        println(s"Got $ev")
      }).compile.drain
    } yield done
    x.map(_ => ExitCode.Success)
  }

}

1
投票

请看下面的例子:

import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Answer {
  type Event = String

  trait Sender {
    def send(event: Event): Unit
  }

  def main(args: Array[String]): Unit = {
    val sender: Sender = {
      val ec = ExecutionContext.global
      implicit val cs: ContextShift[IO] = IO.contextShift(ec)
      implicit val timer: Timer[IO] = IO.timer(ec)

      fs2Sender[IO](2)
    }

    val events = List("a", "b", "c", "d")
    events.foreach { evt => new Thread(() => sender.send(evt)).start() }
    Thread sleep 3000
  }

  def fs2Sender[F[_]: Timer : ContextShift](maxBufferedSize: Int)(implicit F: ConcurrentEffect[F]): Sender = {
    // dummy impl
    // this is where the actual logic for batching
    //   and shipping over the network would live
    val consume: Pipe[F, Event, Unit] = _.evalMap { event =>
      for {
        _ <- F.delay { println(s"consuming [$event]...") }
        _ <- Timer[F].sleep(1.seconds)
        _ <- F.delay { println(s"...[$event] consumed") }
      } yield ()
    }

    val suspended = for {
      q <- Queue.bounded[F, Event](maxBufferedSize)
      _ <- q.dequeue.through(consume).compile.drain.start
      sender <- F.delay[Sender] { evt =>
        val enqueue = for {
          wasEnqueued <- q.offer1(evt)
          _ <- F.delay { println(s"[$evt] enqueued? $wasEnqueued") }
        } yield ()
        enqueue.toIO.unsafeRunAsyncAndForget()
      }
    } yield sender

    suspended.toIO.unsafeRunSync()
  }
}

其主要思想是使用并发队列从FS2。注意,上面的代码表明,无论是Sender接口也不在main逻辑可以被改变。只有Sender接口的实现可以被交换出去。

© www.soinside.com 2019 - 2024. All rights reserved.