为客户端设计API到第三方服务

问题描述 投票:4回答:3

我是Scala的新手,我正在开发一个应用程序(库),它是第三方服务的客户端(我无法修改服务器端,它使用自定义二进制协议)。 我使用Netty进行网络连接。

我想设计一个API,允许用户:

  • 将请求发送到服务器
  • 将请求发送到服务器并异步获取响应
  • 订阅由服务器触发的事件(具有多个异步事件处理程序,它们也应该能够发送请求)

我不确定应该如何设计它。 探索Scala,我偶然发现了一些关于Actor模型的信息,但我不确定它是否可以在那里应用,如果可以的话,如何应用。

我想就我应该采取的方式提出一些建议。

api scala
3个回答
4
投票

通常,向用户代码公开异步功能的Scala-ish方法是返回scala.concurrent.Future[T]

如果您要使用actor路径,则可以考虑将二进制通信封装在单个actor类的上下文中。 您可以使用Akka的路由器支持来扩展此代理参与者的实例,并且您可以使用ask模式轻松生成响应期货。 有一些很好的库( SprayPlay Framework )可以使得在Akka上包装例如RESTful甚至WebSocket层几乎是微不足道的。

pub-sub功能的一个很好的模型可能是定义一个Publisher特性,你可以将它混合到一些actor子类中。 这可以定义一些状态来跟踪订阅者,处理SubscribeUnsubscribe消息,并提供某种方便的广播消息方法:

  /**
    * Sends a copy of the supplied event object to every subscriber of
    * the event object class and superclasses.
    */
  protected[this] def publish[T](event: T) {
    for (subscriber <- subscribersFor(event)) subscriber ! event
  }

这些只是基于在最近的一些项目中做类似事情的一些想法。 如果您需要更具体的指导,请随时详细说明您的用例。 此外, Akka用户列表对于像这样的一般问题是一个很好的资源,如果你真的有兴趣在Scala中探索演员。


2
投票

看一下spray-client库 。 这提供了HTTP请求功能(我假设您要与之通信的服务器是Web服务?)。 它为您提供了一个非常好的DSL来构建请求,而且都是异步的。 它确实在幕后使用akka Actor模型,但您不必构建自己的Actors来使用它。 相反,您可以使用scala的Future模型异步处理事物。 这里有对Future模型的一个很好的介绍。

spray-client的基本构建块是一个“管道”,它将HttpRequest映射到包含HttpResponse的Future:

// this is from the spray-client docs
val pipeline: HttpRequest => Future[HttpResponse] = sendReceive

val response: Future[HttpResponse] = pipeline(Get("http://spray.io/"))

您可以使用此基本构建块,并通过几个步骤将其构建到客户端API中。 首先,创建一个设置管道的类,并定义一些演示ResponseTransformation技术的中间帮助器:

import scala.concurrent._
import spray.can.client.HttpClient
import spray.client.HttpConduit
import spray.client.HttpConduit._
import spray.http.{HttpRequest, HttpResponse, FormData}
import spray.httpx.unmarshalling.Unmarshaller
import spray.io.IOExtension

type Pipeline = (HttpRequest) => Future[HttpResponse]

// this is basically spray-client boilerplate     
def createPipeline(system: ActorSystem, host: String, port: Int): Pipeline = {
    val httpClient = system.actorOf(Props(new HttpClient(IOExtension(system).ioBridge())))
    val conduit = system.actorOf(props = Props(new HttpConduit(httpClient, host, port)))

    sendReceive(conduit)
}

private var pipeline: Pipeline = _
// unmarshalls to a specific type, e.g. a case class representing a datamodel
private def unmarshallingPipeline[T](implicit ec:ExecutionContext, um:Unmarshaller[T]) = (pipeline ~> unmarshal[T])
// for requests that don't return any content.  If you get a successful Future it worked; if there's an error you'll get a failed future from the errorFilter below.
private def unitPipeline(implicit ec:ExecutionContext) = (pipeline ~>  { _:HttpResponse => () })
// similar to unitPipeline, but where you care about the specific response code.
private def statusPipeline(implicit ec:ExecutionContext) = (pipeline -> {r:HttpResponse => r.status})

// if you want standard error handling create a filter like this
// RemoteServerError and RemoteClientError are custom exception classes
// not shown here.
val errorFilter = { response:HttpResponse =>
  if(response.status.isSuccess) response
  else if(response.status.value >= 500) throw RemoteServerError(response)
  else throw RemoteClientError(response)
}

pipeline = (createPipeline(system, "yourHost", 8080) ~> errorFilter)

然后,您可以将这些包含在与成为公共API的特定请求/响应相关联的方法中。 例如,假设服务有一个“ping”GET端点,它返回一个字符串(“pong”)和一个“表单”POST端点,在这里你发布表单数据并接收一个DataModel作为回报:

def ping()(implicit ec:ExecutionContext, um:Unmarshaller[String]): Future[String] =
    unmarshallingPipeline(Get("/ping"))

def form(formData: Map[String, String])(implicit ec:ExecutionContext, um:Unmarshaller[DataModel]): Future[DataModel] = 
    unmarshallingPipeline(Post("/form"), FormData(formData)) 

然后有人可以像这样使用API​​:

import scala.util.{Failure, Success}

API.ping() foreach(println)  // will print out "pong" when response comes back

API.form(Map("a" -> "b") onComplete {
    case Success(dataModel) => println("Form accepted. Server returned DataModel: " + dataModel)
    case Failure(e) => println("Oh noes, the form didn't go through! " + e)
}

我不确定你是否会在spray-client中找到关于订阅活动的第三个要点的直接支持。 这些事件是由服务器生成的,并以某种方式发送到特定HTTP请求范围之外的客户端? 如果是这样,那么spray-client可能无法直接提供帮助(尽管您的事件处理程序仍然可以使用它来发送请求)。 事件是否发生在客户端,例如,最初由服务器响应触发的延迟处理的完成? 如果是这样,你实际上可能只是通过使用Future中的功能,但根据你的用例,使用Actors可能有意义。


2
投票

观测

这似乎是Obesrvable模式的一个很好的例子。 此模式来自.NETReactive Extensions ,但也可用于JavaScala 。 该图书馆由Netflix提供,质量非常好。

这种模式具有良好的理论基础 - 它是理论意义上的迭代器的双重性。 但更重要的是,它有很多实用的想法。 特别是它处理时间非常好,例如你可以限制你想要的事件率。

通过观察,您可以处理非常高级别的事件。 在.NET中,它看起来很像SQL查询。 您可以注册某些事件(“FROM”),过滤它们(“WHERE”)并最终处理它们(“SELECT”)。 在Scala中,您可以使用标准的monadic API(map,filter,flatMap),当然也可以使用“for expressions”。

一个例子可以看起来像

stackoverflowQuestions.filter(_.tag == "Scala").map(_.subject).throttleLast(1 second).subscribe(println _)

Obeservable消除了基于事件的系统会遇到的许多问题

  • 处理订阅
  • 处理错误
  • 过滤和预处理事件
  • 缓冲事件

构建API

您的API应该为您拥有的每个事件源提供一个obesrvable。 对于过程调用,您提供了一个将函数调用映射到obesrvable的函数。 此函数将调用远程过程并通过obeservable提供结果。

实施细节

将以下依赖项添加到build.sbt:

libraryDependencies +=     "com.netflix.rxjava" % "rxjava-scala" % "0.15.0"

然后,您可以使用以下模式将回调转换为obeservable(假设您的远程API具有某种方式来注册和取消注册回调):

private val callbackFunc : (rx.lang.scala.Observer[String]) => rx.lang.scala.Subscription = { o =>
  val listener = {
    case Value(s) => o.onNext(s)
    case Error(e) => o.onError(o)
  }

  remote.subscribe(listener)

  // Return an interface to cancel the subscription
  new Subscription {
    val unsubscribed = new AtomicBoolean(false)
    def isUnsubscribed: Boolean = unsubscribed.get()

    val asJavaSubscription: rx.Subscription = new rx.Subscription {
      def unsubscribe() {
        remote.unsubscribe(listener)
        unsubscribed.set(true)
      }
    }
  }

如果您有一些具体问题,请询问,我可以改进答案

额外的资源

Martin Odersky等人的课程非常精彩。 在课程中,涵盖了Observables和其他反应技术。

© www.soinside.com 2019 - 2024. All rights reserved.