无法使用 mongodb 在 akka http 中将 scala.concurrent.impl.Promise$Transformation 转换为 scala.collection.immutable.Seq 错误?

问题描述 投票:0回答:2

基本上我尝试使用 akka http 从数据库获取数据。如果我直接在 api 中传递 (EmployeeRepo.findAll()) 那么它会显示所有数据,但是在使用 actor 时它会显示转换错误...... 这里的问题仅用于数据获取,请解决它

这是我的 EmployeeRepo------------------------------------------------

    package org.repo

import org.data.Employee
import org.db.DbConfig
import org.mongodb.scala.MongoCollection
import org.utils.JsonUtils
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Filters.equal
import org.mongodb.scala.model.FindOneAndUpdateOptions
import org.mongodb.scala.model.Updates.{combine, set}

import scala.concurrent.Future

object EmployeeRepo extends JsonUtils{
private val employeeDoc:MongoCollection[Employee]=DbConfig.employees

def createCollection()={
DbConfig.database.createCollection("employee").subscribe(
  (result)=>println(s"$result"),
  e=>println(e.getLocalizedMessage),
()=>println("complete")
)
}


  def insertData(emp:Employee)={
    employeeDoc.insertOne(emp).toFuture()
  }
  def findAll()= employeeDoc.find().toFuture()

  def update(emp:Employee,id:String)=
    employeeDoc
      .findOneAndUpdate(equal("_id", id),
        setBsonValue(emp),
        FindOneAndUpdateOptions().upsert(true)).toFuture()

  def delete(id:String)=
    employeeDoc.deleteOne(equal("_id",id)).toFuture()
  private def setBsonValue(emp:Employee)={
    combine(
      set("name",emp.name),
      set("dateOfBirth",emp.dateOfBirth)
    )
  }
}

============员工服务------------ 包 org.service

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.UUID

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.data.Employee
import org.domain.EmployeeRequest
import org.repo.EmployeeRepo

import scala.concurrent.Future

class EmployeeService {

implicit val system=ActorSystem("Employeee")
  implicit val materializer=ActorMaterializer
  import system.dispatcher

def saveEmployeeData= (employeeRequest:EmployeeRequest) => {
val employeeDoc:Employee=employeeMapperWithNewId(employeeRequest)
EmployeeRepo.insertData(employeeDoc)
}
def findAll={
  EmployeeRepo.findAll()
}

  def update(employeeRequest: EmployeeRequest,id:String)={
    val employeeDoc=employeeMapperWithNewId(employeeRequest)
EmployeeRepo.update(emp = employeeDoc,id)
  }
  def delete(id:String)=EmployeeRepo.delete(id)

  private def employeeMapperWithNewId(employee:EmployeeRequest)={
Employee(name=employee.name,dateOfBirth = LocalDate.parse(employee.dateOfBirth, DateTimeFormatter.ISO_DATE),
_id=UUID.randomUUID.toString)
  }
}

-----------------EmployeeActor------

package org.actor

import akka.actor.{Actor, ActorLogging}
import org.actor.EmployeeActor.{Delete, Save, SearchAll, Update}
import org.data.Employee
import org.domain.EmployeeRequest
import org.service.EmployeeService

object EmployeeActor {

  case class Save(emp: EmployeeRequest)

  case object SearchAll

  case class Update(emp: EmployeeRequest, id: String)

  case class Delete(id: String)

}

class
EmployeeActor extends Actor with ActorLogging {
  private val employeeService: EmployeeService = new EmployeeService()

  override def receive: Receive = {
    case Save(emp) =>
      log.info(s"recevied msg saved with employee :$emp")
      sender() ! employeeService.saveEmployeeData(emp)

    case SearchAll =>
      log.info("received msg find ALL")
      sender() ! employeeService.findAll

    case Update(emp, id) =>
      log.info(s"received update msg for id $id and employee $emp")
      sender() ! employeeService.update(emp, id)
    case Delete(id) =>
      log.info(s"received msg for deleting employee of id $id")
      sender() ! employeeService.delete(id)
    case _ =>
      log.debug("Unhandled msg")


  }
}

----------------员工路线----

    package org
    
    import akka.actor.{ActorSystem, Props}
    import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
    import akka.stream.ActorMaterializer
    import org.actor.EmployeeActor
    import org.utils.{JsonUtils, TimeUtils}
    import akka.http.scaladsl.server.Directives._
    import akka.{NotUsed, util}
    import akka.util.{ByteString, Timeout}
    import org.data.Employee
    import org.domain.EmployeeRequest
    import akka.pattern.{Patterns, ask}
    import akka.stream.scaladsl.Source
    import org.actor.EmployeeActor.{Delete, Save, SearchAll, Update}
    import org.service.EmployeeService
    
    import scala.concurrent.duration._
    import spray.json._
    
    import scala.concurrent.{Await, Future}
    
    class EmployeeRoute extends JsonUtils{
    
      implicit val system=ActorSystem("Employee")
      implicit val materializer=ActorMaterializer
      import system.dispatcher
      val actor=system.actorOf(Props[EmployeeActor],"employeeActor")
      val employeeService=new EmployeeService()
    
      implicit val timeOut=Timeout(5.seconds)
    
      val getRoute={
        pathPrefix("employee"){
    
          (pathEndOrSingleSlash & get){
            complete((actor ? SearchAll).mapTo[Seq[EmployeeRequest]])
  }~
          ( path("update") & put &  parameter("id".as[String])){id=>
            entity(as[EmployeeRequest]){employee=>
              complete((actor ? Update(employee,id)).map(_=>StatusCodes.OK))
            }
          }~
          post{
            entity(as[EmployeeRequest]) { employee =>
              complete((actor ? Save(employee)).map(_ => StatusCodes.OK))
            }
          }~
            delete{
              (path(Segment) |parameter("id".as[String])){id=>
                complete((actor ? Delete(id)).map(_=>StatusCodes.OK))
              }
            }
    
    
    
        }
      }
    }

==================错误================

[ERROR] [09/09/2020 19:46:48.551] [web-app-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(web-app)] Error during processing of request: 'Cannot cast scala.concurrent.impl.Promise$Transformation to scala.collection.immutable.Seq'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
java.lang.ClassCastException: Cannot cast scala.concurrent.impl.Promise$Transformation to scala.collection.immutable.Seq
    at java.base/java.lang.Class.cast(Class.java:3734)
    at scala.concurrent.Future.$anonfun$mapTo$1(Future.scala:464)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:430)
    at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:164)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:392)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:299)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:249)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:242)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:615)
    at org.actor.EmployeeActor$$anonfun$receive$1.applyOrElse(EmployeeActor.scala:32)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at org.actor.EmployeeActor.aroundReceive(EmployeeActor.scala:22)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

请帮帮我..

mongodb scala postman akka-http akka-actor
2个回答
2
投票

您几乎分析了所有内容。

EmployeeRepo.findAll
是你的问题。你不应该在 akka actor 中使用 Futures。应使用
pipeTo
来代替。

请尝试更新

EmployeeActor

case SearchAll =>
  log.info("received msg find ALL")
  employeeService.findAll.pipeTo(sender())

0
投票

我也遇到了类似的错误,我通过-

解决了

覆盖默认接收:接收= {

case CreateTask(task) =>
  val senderRef=sender()
  log.info("creating a task")
   taskService.createTask(task).map{
   _ =>
     senderRef ! "task created successfully"
 }}

instead of 
case CreateTask(task) => 
log.info("creating a task")
sender ! taskService.createTask(task)
© www.soinside.com 2019 - 2024. All rights reserved.