阿卡HTTP流反序列化JSON

问题描述 投票:8回答:3

是否有可能动态地反序列化外部,未知长度的,从ByteString阿卡HTTP流分成域对象?


Context

我称之为无限长HTTP端点输出,保持增长的一个JSON Array

[
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    ...
] <- Never sees the daylight
json akka akka-stream akka-http
3个回答
0
投票

我认为play-iteratees-extras必须帮助你。这个库允许通过枚举/ Iteratee模式来解析JSON,当然,不等待接收的所有数据。

例如,以免建立字节“无限”流,它表示“无限大” JSON数组。

import play.api.libs.iteratee.{Enumeratee, Enumerator, Iteratee}

var i = 0
var isFirstWas = false

val max = 10000

val stream = Enumerator("[".getBytes) andThen Enumerator.generateM {
  Future {
    i += 1
    if (i < max) {
      val json = Json.stringify(Json.obj(
        "prop" -> Random.nextBoolean(),
        "prop2" -> Random.nextBoolean(),
        "prop3" -> Random.nextInt(),
        "prop4" -> Random.alphanumeric.take(5).mkString("")
      ))

      val string = if (isFirstWas) {
        "," + json
      } else {
        isFirstWas = true
        json
      }


      Some(Codec.utf_8.encode(string))
    } else if (i == max) Some("]".getBytes) // <------ this is the last jsArray closing tag
    else None

  }
}

好,该值包含10000(或多个)对象的jsArray。允许定义的情况下的类,将被包含在我们的数组中的每个对象的数据。

case class Props(prop: Boolean, prop2: Boolean, prop3: Int, prop4: String)

现在写解析器,这将是分析每个项目

import play.extras.iteratees._    
import JsonBodyParser._
import JsonIteratees._
import JsonEnumeratees._

val parser = jsArray(jsValues(jsSimpleObject)) ><> Enumeratee.map { json =>
  for {
    prop <- json.\("prop").asOpt[Boolean]
    prop2 <- json.\("prop2").asOpt[Boolean]
    prop3 <- json.\("prop3").asOpt[Int]
    prop4 <- json.\("prop4").asOpt[String]
  } yield Props(prop, prop2, prop3, prop4)
}

请参阅docjsArrayjsValuesjsSimpleObject。创建结果制作:

val result = stream &> Encoding.decode() ><> parser

从JsonIteratees包Encoding.decode()将解码字节CharStringresult值的类型Enumerator[Option[Item]],你可以应用一些iteratee这个枚举启动解析过程。

总体而言,我不知道你是怎么得到字节(解决方案在很大程度上取决于这一点),但我认为这显示您的问题可能的解决方案之一。


0
投票

我有一个非常类似的问题,试图将Twitter的数据流(无限字符串)解析为一个域对象。我解决了它使用Json4s,就像这样:

case class Tweet(username: String, geolocation: Option[Geo])
case class Geo(latitude: Float, longitude: Float)
object Tweet{
    def apply(s: String): Tweet = {
        parse(StringInput(s), useBigDecimalForDouble = false, useBigIntForLong = false).extract[Tweet]
    }
}

然后,我只是缓冲流,并将其映射到了一条Twitter消息:

val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8"))
var line = reader.readLine()
while(line != null){
    store(Tweet.apply(line))
    line = reader.readLine()
}

Json4s拥有选项(在该示例中在对象内部或自定义对象,像GEO)的完全支持。因此,你可以把一个选项像我一样,如果现场没有在JSON来了,它将被设置为无。

希望能帮助到你!


0
投票

我想这应该JsonFraming.objectScanner(Int.MaxValue)在这种情况下使用。作为文档状态:

返回实现发射有效的JSON块一个“梅开二度计数”的基础框架操作的流程。它扫描输入数据流为有效的JSON对象并返回仅包含这些有效块字节串的数据块。一个可能希望使用此操作符的帧数据的典型实例包括:非常大的阵列

所以,你可以像这样结束了:

val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl))

response.onComplete {
  case Success(value) =>
    value.entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(_.utf8String)         // In case you have ByteString
      .map(decode[MyEntity](_))  // Use any Unmarshaller here
      .grouped(20)
      .runWith(Sink.ignore)      // Do whatever you need here 
  case Failure(exception) => log.error(exception, "Api call failed")
}
© www.soinside.com 2019 - 2024. All rights reserved.