我有一个外部(即,我不能改变它)的Java API它是这样的:
public interface Sender {
void send(Event e);
}
我需要实现其接受每个事件,将其转换为一个JSON对象,收集他们的一些号码为单个束,并发送通过HTTP一些端点的Sender
。这一切都应该异步进行的,没有send()
阻塞调用线程,与一些固定大小的缓冲区和丢弃新事件,如果缓冲区已满。
随着阿卡流,这是很简单的:我创建阶段的图表(使用阿卡-HTTP发送HTTP请求),具体化,并使用物化ActorRef
推新事件流:
lazy val eventPipeline = Source.actorRef[Event](Int.MaxValue, OverflowStrategy.fail)
.via(CustomBuffer(bufferSize)) // buffer all events
.groupedWithin(batchSize, flushDuration) // group events into chunks
.map(toBundle) // convert each chunk into a JSON message
.mapAsyncUnordered(1)(sendHttpRequest) // send an HTTP request
.toMat(Sink.foreach { response =>
// print HTTP response for debugging
})(Keep.both)
lazy val (eventsActor, completeFuture) = eventPipeline.run()
override def send(e: Event): Unit = {
eventsActor ! e
}
这里CustomBuffer
是一个自定义GraphStage
这是非常相似的库提供的Buffer
但适合我们的具体需求;它可能并不重要这个特殊的问题。
正如你所看到的,与非流码流的交互是非常简单 - 在!
性状ActorRef
方法是异步的,不需要任何额外的机械被调用。然后通过反应性整个管道处理的每个被发送到演员事件。此外,因为如何阿卡-HTTP实现,我甚至得到连接池免费的,所以不超过一个打开连接到服务器。
不过,我不能找到一种方法,做同样的事情与FS2正常。即使丢弃缓冲的问题(我可能需要编写一个自定义Pipe
实现这确实是我们需要更多的东西)和HTTP连接池,我还是坚持了一个更基本的东西 - 那就是,如何将数据推到反应性流“来自外部的”。
所有教程和文档,我可以找到假设整个程序有一定的影响上下文中发生,通常IO
。这不是我的情况 - send()
方法是由Java库在未指定的时间调用。因此,我不能把一切都一个IO
动作里面,我一定要完成的send()
方法里面的“推”的动作,并有反应流作为一个独立的实体,因为我要聚集事件,并希望池HTTP连接(这我相信自然联系在一起的反应流)。
我认为我需要一些额外的数据结构,像Queue
。 FS2确实有某种fs2.concurrent.Queue
的,但同样的,所有的文件显示了如何使用一个单一IO
上下文中,所以我认为做这样的事情
val queue: Queue[IO, Event] = Queue.unbounded[IO, Event].unsafeRunSync()
然后使用queue
流定义内,然后分别与进一步send()
呼叫unsafeRun
方法内:
val eventPipeline = queue.dequeue
.through(customBuffer(bufferSize))
.groupWithin(batchSize, flushDuration)
.map(toBundle)
.mapAsyncUnordered(1)(sendRequest)
.evalTap(response => ...)
.compile
.drain
eventPipeline.unsafeRunAsync(...) // or something
override def send(e: Event) {
queue.enqueue(e).unsafeRunSync()
}
是不是正确的方式,最有可能甚至不工作。
所以,我的问题是,我该如何正确使用FS2解决我的问题?
我没有太多的经验,正是库,但它应该以某种方式那样:
import cats.effect.{ExitCode, IO, IOApp}
import fs2.concurrent.Queue
case class Event(id: Int)
class JavaProducer{
new Thread(new Runnable {
override def run(): Unit = {
var id = 0
while(true){
Thread.sleep(1000)
id += 1
send(Event(id))
}
}
}).start()
def send(event: Event): Unit ={
println(s"Original producer prints $event")
}
}
class HackedProducer(queue: Queue[IO, Event]) extends JavaProducer {
override def send(event: Event): Unit = {
println(s"Hacked producer pushes $event")
queue.enqueue1(event).unsafeRunSync()
println(s"Hacked producer pushes $event - Pushed")
}
}
object Test extends IOApp{
override def run(args: List[String]): IO[ExitCode] = {
val x: IO[Unit] = for {
queue <- Queue.unbounded[IO, Event]
_ = new HackedProducer(queue)
done <- queue.dequeue.map(ev => {
println(s"Got $ev")
}).compile.drain
} yield done
x.map(_ => ExitCode.Success)
}
}
请看下面的例子:
import cats.implicits._
import cats.effect._
import cats.effect.implicits._
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object Answer {
type Event = String
trait Sender {
def send(event: Event): Unit
}
def main(args: Array[String]): Unit = {
val sender: Sender = {
val ec = ExecutionContext.global
implicit val cs: ContextShift[IO] = IO.contextShift(ec)
implicit val timer: Timer[IO] = IO.timer(ec)
fs2Sender[IO](2)
}
val events = List("a", "b", "c", "d")
events.foreach { evt => new Thread(() => sender.send(evt)).start() }
Thread sleep 3000
}
def fs2Sender[F[_]: Timer : ContextShift](maxBufferedSize: Int)(implicit F: ConcurrentEffect[F]): Sender = {
// dummy impl
// this is where the actual logic for batching
// and shipping over the network would live
val consume: Pipe[F, Event, Unit] = _.evalMap { event =>
for {
_ <- F.delay { println(s"consuming [$event]...") }
_ <- Timer[F].sleep(1.seconds)
_ <- F.delay { println(s"...[$event] consumed") }
} yield ()
}
val suspended = for {
q <- Queue.bounded[F, Event](maxBufferedSize)
_ <- q.dequeue.through(consume).compile.drain.start
sender <- F.delay[Sender] { evt =>
val enqueue = for {
wasEnqueued <- q.offer1(evt)
_ <- F.delay { println(s"[$evt] enqueued? $wasEnqueued") }
} yield ()
enqueue.toIO.unsafeRunAsyncAndForget()
}
} yield sender
suspended.toIO.unsafeRunSync()
}
}
其主要思想是使用并发队列从FS2。注意,上面的代码表明,无论是Sender
接口也不在main
逻辑可以被改变。只有Sender
接口的实现可以被交换出去。