Mongo Spark Java连接器分组者

问题描述 投票:0回答:1

我正在从客户端移动应用程序在我的服务器上存储事件,事件存储为mongodb。我有mongo-spark连接器,可获取这些事件的列表,并应使用rest api显示它们。它应该在以后流式传输,但是现在我正尝试将其显示为单个呼叫。

到目前为止,我已经编写了如下所示的控制器:

@RestController
@RequestMapping("/analytics")
class EventController @Autowired constructor(val eventMongoServiceImpl: EventMongoServiceImpl,
                                             val javaSparkContext: JavaSparkContext) {

    @GetMapping("/event")
    fun getEvent(): ResponseEntity<EventResponse> {
        val customRdd: JavaMongoRDD<Document> = MongoSpark.load(javaSparkContext)
        val toDF = customRdd.toDF()
    }
}

[请帮助我过滤以下有关REST API的结果:

 [
      {
        "key": "Event A",
        "description": "Event A Description",
        "count": 3
      },
      {
        "key": "Event B",
        "description": "Event B Description",
        "count": 0
      }
    ]

我的数据集如下:

/* 1 */
{
    "_id" : ObjectId("5e61e38eb8425d3b1c7679ea"),
    "name" : "Event A",
    "description" : "Event A Description",
    "date" : ISODate("2020-03-05T18:30:00.000Z"),
    "_class" : "x"
}

/* 2 */
{
    "_id" : ObjectId("5e61e416b8425d3b1c7679ec"),
    "name" : "Event A",
    "description" : "Event A Description",
    "date" : ISODate("2020-03-05T18:30:00.000Z"),
    "_class" : "x"
}

/* 3 */
{
    "_id" : ObjectId("5e61e47fb8425d3b1c7679ee"),
    "name" : "Event A",
    "description" : "Event A Description",
    "date" : ISODate("2020-03-05T18:30:00.000Z"),
    "_class" : "x"
}
spring-boot apache-spark apache-spark-sql apache-spark-dataset
1个回答
0
投票

您应该可以在数据框上执行类似的操作

val aggDf = toDf
 .groupBy("name")
 .agg(count("name"), max("description"))

现在在新数据框aggDf上,您可以执行aggDf.toJson并得到结果。如果列与输出不匹配,则可以使用withColumnRenamed

对其进行调整
© www.soinside.com 2019 - 2024. All rights reserved.