我是Scala的新手,我正在开发一个应用程序(库),它是第三方服务的客户端(我无法修改服务器端,它使用自定义二进制协议)。 我使用Netty进行网络连接。
我想设计一个API,允许用户:
我不确定应该如何设计它。 探索Scala,我偶然发现了一些关于Actor模型的信息,但我不确定它是否可以在那里应用,如果可以的话,如何应用。
我想就我应该采取的方式提出一些建议。
通常,向用户代码公开异步功能的Scala-ish方法是返回scala.concurrent.Future[T]
。
如果您要使用actor路径,则可以考虑将二进制通信封装在单个actor类的上下文中。 您可以使用Akka的路由器支持来扩展此代理参与者的实例,并且您可以使用ask模式轻松生成响应期货。 有一些很好的库( Spray , Play Framework )可以使得在Akka上包装例如RESTful甚至WebSocket层几乎是微不足道的。
pub-sub功能的一个很好的模型可能是定义一个Publisher
特性,你可以将它混合到一些actor子类中。 这可以定义一些状态来跟踪订阅者,处理Subscribe
和Unsubscribe
消息,并提供某种方便的广播消息方法:
/**
* Sends a copy of the supplied event object to every subscriber of
* the event object class and superclasses.
*/
protected[this] def publish[T](event: T) {
for (subscriber <- subscribersFor(event)) subscriber ! event
}
这些只是基于在最近的一些项目中做类似事情的一些想法。 如果您需要更具体的指导,请随时详细说明您的用例。 此外, Akka用户列表对于像这样的一般问题是一个很好的资源,如果你真的有兴趣在Scala中探索演员。
看一下spray-client库 。 这提供了HTTP请求功能(我假设您要与之通信的服务器是Web服务?)。 它为您提供了一个非常好的DSL来构建请求,而且都是异步的。 它确实在幕后使用akka Actor模型,但您不必构建自己的Actors来使用它。 相反,您可以使用scala的Future模型异步处理事物。 这里有对Future模型的一个很好的介绍。
spray-client的基本构建块是一个“管道”,它将HttpRequest映射到包含HttpResponse的Future:
// this is from the spray-client docs
val pipeline: HttpRequest => Future[HttpResponse] = sendReceive
val response: Future[HttpResponse] = pipeline(Get("http://spray.io/"))
您可以使用此基本构建块,并通过几个步骤将其构建到客户端API中。 首先,创建一个设置管道的类,并定义一些演示ResponseTransformation技术的中间帮助器:
import scala.concurrent._
import spray.can.client.HttpClient
import spray.client.HttpConduit
import spray.client.HttpConduit._
import spray.http.{HttpRequest, HttpResponse, FormData}
import spray.httpx.unmarshalling.Unmarshaller
import spray.io.IOExtension
type Pipeline = (HttpRequest) => Future[HttpResponse]
// this is basically spray-client boilerplate
def createPipeline(system: ActorSystem, host: String, port: Int): Pipeline = {
val httpClient = system.actorOf(Props(new HttpClient(IOExtension(system).ioBridge())))
val conduit = system.actorOf(props = Props(new HttpConduit(httpClient, host, port)))
sendReceive(conduit)
}
private var pipeline: Pipeline = _
// unmarshalls to a specific type, e.g. a case class representing a datamodel
private def unmarshallingPipeline[T](implicit ec:ExecutionContext, um:Unmarshaller[T]) = (pipeline ~> unmarshal[T])
// for requests that don't return any content. If you get a successful Future it worked; if there's an error you'll get a failed future from the errorFilter below.
private def unitPipeline(implicit ec:ExecutionContext) = (pipeline ~> { _:HttpResponse => () })
// similar to unitPipeline, but where you care about the specific response code.
private def statusPipeline(implicit ec:ExecutionContext) = (pipeline -> {r:HttpResponse => r.status})
// if you want standard error handling create a filter like this
// RemoteServerError and RemoteClientError are custom exception classes
// not shown here.
val errorFilter = { response:HttpResponse =>
if(response.status.isSuccess) response
else if(response.status.value >= 500) throw RemoteServerError(response)
else throw RemoteClientError(response)
}
pipeline = (createPipeline(system, "yourHost", 8080) ~> errorFilter)
然后,您可以将这些包含在与成为公共API的特定请求/响应相关联的方法中。 例如,假设服务有一个“ping”GET端点,它返回一个字符串(“pong”)和一个“表单”POST端点,在这里你发布表单数据并接收一个DataModel作为回报:
def ping()(implicit ec:ExecutionContext, um:Unmarshaller[String]): Future[String] =
unmarshallingPipeline(Get("/ping"))
def form(formData: Map[String, String])(implicit ec:ExecutionContext, um:Unmarshaller[DataModel]): Future[DataModel] =
unmarshallingPipeline(Post("/form"), FormData(formData))
然后有人可以像这样使用API:
import scala.util.{Failure, Success}
API.ping() foreach(println) // will print out "pong" when response comes back
API.form(Map("a" -> "b") onComplete {
case Success(dataModel) => println("Form accepted. Server returned DataModel: " + dataModel)
case Failure(e) => println("Oh noes, the form didn't go through! " + e)
}
我不确定你是否会在spray-client中找到关于订阅活动的第三个要点的直接支持。 这些事件是由服务器生成的,并以某种方式发送到特定HTTP请求范围之外的客户端? 如果是这样,那么spray-client可能无法直接提供帮助(尽管您的事件处理程序仍然可以使用它来发送请求)。 事件是否发生在客户端,例如,最初由服务器响应触发的延迟处理的完成? 如果是这样,你实际上可能只是通过使用Future中的功能,但根据你的用例,使用Actors可能有意义。
这似乎是Obesrvable模式的一个很好的例子。 此模式来自.NET的Reactive Extensions ,但也可用于Java和Scala 。 该图书馆由Netflix提供,质量非常好。
这种模式具有良好的理论基础 - 它是理论意义上的迭代器的双重性。 但更重要的是,它有很多实用的想法。 特别是它处理时间非常好,例如你可以限制你想要的事件率。
通过观察,您可以处理非常高级别的事件。 在.NET中,它看起来很像SQL查询。 您可以注册某些事件(“FROM”),过滤它们(“WHERE”)并最终处理它们(“SELECT”)。 在Scala中,您可以使用标准的monadic API(map,filter,flatMap),当然也可以使用“for expressions”。
一个例子可以看起来像
stackoverflowQuestions.filter(_.tag == "Scala").map(_.subject).throttleLast(1 second).subscribe(println _)
Obeservable消除了基于事件的系统会遇到的许多问题
您的API应该为您拥有的每个事件源提供一个obesrvable。 对于过程调用,您提供了一个将函数调用映射到obesrvable的函数。 此函数将调用远程过程并通过obeservable提供结果。
将以下依赖项添加到build.sbt:
libraryDependencies += "com.netflix.rxjava" % "rxjava-scala" % "0.15.0"
然后,您可以使用以下模式将回调转换为obeservable(假设您的远程API具有某种方式来注册和取消注册回调):
private val callbackFunc : (rx.lang.scala.Observer[String]) => rx.lang.scala.Subscription = { o =>
val listener = {
case Value(s) => o.onNext(s)
case Error(e) => o.onError(o)
}
remote.subscribe(listener)
// Return an interface to cancel the subscription
new Subscription {
val unsubscribed = new AtomicBoolean(false)
def isUnsubscribed: Boolean = unsubscribed.get()
val asJavaSubscription: rx.Subscription = new rx.Subscription {
def unsubscribe() {
remote.unsubscribe(listener)
unsubscribed.set(true)
}
}
}
如果您有一些具体问题,请询问,我可以改进答案
Martin Odersky等人的课程非常精彩。 在课程中,涵盖了Observables和其他反应技术。