在 Scala 中解组 protobuf

问题描述 投票:0回答:1

我之前已经使用 ScalaPB 和以下编组器在 Scala 中进行了解组工作:

implicit def marshaller[T <: GeneratedMessage]: ToEntityMarshaller[T] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[T](r => r.toByteArray)

implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[Request]](implicit companion: GeneratedMessageCompanion[Request]): FromEntityUnmarshaller[Request] = {
    Unmarshaller.byteArrayUnmarshaller.map[Request](bytes => companion.parseFrom(bytes))
  }

这允许我的

Route
接受类型为
Request
的传入消息,定义为:

syntax = "proto3";

package PROTOS;

option java_package = "hydra.core.messaging.protobuf";

message RegisterRequest {
  string username = 1;
  optional string password = 2;
}

message Request {
  string hostID = 1;

  oneof requestType {
    RegisterRequest registerRequest = 2;
  }
}

我在系统中添加了另一个

Route
,它接受
DataRequest
类型。这定义为:

syntax = "proto3";

package PROTOS;

option java_package = "hydra.core.messaging.protobuf";

message DataRequest {
  string hostID = 1;
  string data = 2;
}

因此,我修改了 AKKA 参与者和路由,以使用通配符类型作为它们接收和响应的消息类型,定义为:

  final case class ActorRequest[T, E](request: T, replyTo: ActorRef[ActorResponse[E]])

  final case class ActorResponse[T](response: T)

为了减少重复代码,我将

Route
创建移至超类中。超类
Layer
看起来像:

trait Marshalling extends DefaultJsonProtocol with SprayJsonSupport {
  
  implicit def marshaller[E <: GeneratedMessage]: ToEntityMarshaller[E] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[E](r => r.toByteArray)

  implicit def unmarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[T]](implicit companion: GeneratedMessageCompanion[T]): FromEntityUnmarshaller[T] = {
    Unmarshaller.byteArrayUnmarshaller.map[T](bytes => companion.parseFrom(bytes))
  }
  
}

abstract class Layer[T <: GeneratedMessage, E <: GeneratedMessage](name: String, directivePath: String)
  extends CORSHandler with Marshalling {

  implicit val timeout: Timeout = Timeout.create(SYSTEM.settings.config.getDuration("my-app.routes.ask-timeout"))

  private var systemActor: ActorRef[ActorRequest[T, E]] = null

  def createResponse(request: T): ActorResponse[E]

  private def createRoutes(): Route = {
    pathPrefix(HOST_ID) {
      path(directivePath) {
        post {
          entity(as[T]) { request =>
            onComplete(handle(request)) {
              case Success(response) =>
                complete(response.response)
              case Failure(exception) => complete(InternalServerError, s"An error occurred ${exception.getMessage}")
            }
          }
        }
      }
    }
  }

...
}

切换到通配符 Unmarshaller 时,出现以下错误:

I found:

    akka.http.scaladsl.unmarshalling.Unmarshaller.
      messageUnmarshallerFromEntityUnmarshaller[T](
      akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.
        sprayJsonUnmarshaller[T](/* missing */summon[spray.json.RootJsonReader[T]])
    )

But no implicit values were found that match type spray.json.RootJsonReader[T].
          entity(as[T]) { request =>

有这方面的专家可以帮助我找出问题吗?该错误似乎是在抱怨它不是

FromRequestMarshaller
,但之前定义类类型时的 Unmarshaller 也不是。有什么建议吗?

最小可重现示例:https://github.com/ritcat14/Hydra_broken_marshalling

json scala unmarshalling akka-http scalapb
1个回答
0
投票

因此,虽然这“解决”了问题,但这是一个令人讨厌的解决方法,并将修复放在 protobuf 端而不是修复通配符编组,但目前这是可行的。

我创建了一个

ProtoMessage
protobuf,它只有一个包含所有消息类型的
oneof
字段,如下所示:

syntax = "proto3";

import "Request.proto";
import "DataRequest.proto";
import "Response.proto";
import "DataResponse.proto";

package PROTOS;

option java_package = "hydra.core.messaging.protobuf";

message UnknownType {}

message ProtoMessage {
  string hostID = 1;

  oneof messageType {
    Request request = 2;
    DataRequest dataRequest = 3;

    Response response = 4;
    DataResponse dataResponse = 5;

    UnknownType unknownType = 6;
  }
}

然后我改为编组/取消编组该对象:

  implicit def protobufMarshaller[T <: GeneratedMessage]: ToEntityMarshaller[ProtoMessage] = PredefinedToEntityMarshallers.ByteArrayMarshaller.compose[ProtoMessage](r => r.toByteArray)

  implicit def requestMarshaller[T <: GeneratedMessage with GeneratedMessageCompanion[ProtoMessage]](implicit companion: GeneratedMessageCompanion[ProtoMessage]): FromEntityUnmarshaller[ProtoMessage] = {
    Unmarshaller.byteArrayUnmarshaller.map[ProtoMessage](bytes => companion.parseFrom(bytes))
  }

我用

ProtoMessage
替换了所有通配符类类型。

这是一个解决方案,是的。可怕吗?绝对地。这是永久修复吗?没有。与此修复程序一样,我仍然愿意接受建议,每次我收到每个

messageType
的消息时,我都必须在
Route
字段上进行过滤。

© www.soinside.com 2019 - 2024. All rights reserved.