我正在从客户端移动应用程序在我的服务器上存储事件,事件存储为mongodb
。我有mongo-spark连接器,可获取这些事件的列表,并应使用rest api显示它们。它应该在以后流式传输,但是现在我正尝试将其显示为单个呼叫。
到目前为止,我已经编写了如下所示的控制器:
@RestController
@RequestMapping("/analytics")
class EventController @Autowired constructor(val eventMongoServiceImpl: EventMongoServiceImpl,
val javaSparkContext: JavaSparkContext) {
@GetMapping("/event")
fun getEvent(): ResponseEntity<EventResponse> {
val customRdd: JavaMongoRDD<Document> = MongoSpark.load(javaSparkContext)
val toDF = customRdd.toDF()
}
}
[请帮助我过滤以下有关REST API的结果:
[
{
"key": "Event A",
"description": "Event A Description",
"count": 3
},
{
"key": "Event B",
"description": "Event B Description",
"count": 0
}
]
我的数据集如下:
/* 1 */
{
"_id" : ObjectId("5e61e38eb8425d3b1c7679ea"),
"name" : "Event A",
"description" : "Event A Description",
"date" : ISODate("2020-03-05T18:30:00.000Z"),
"_class" : "x"
}
/* 2 */
{
"_id" : ObjectId("5e61e416b8425d3b1c7679ec"),
"name" : "Event A",
"description" : "Event A Description",
"date" : ISODate("2020-03-05T18:30:00.000Z"),
"_class" : "x"
}
/* 3 */
{
"_id" : ObjectId("5e61e47fb8425d3b1c7679ee"),
"name" : "Event A",
"description" : "Event A Description",
"date" : ISODate("2020-03-05T18:30:00.000Z"),
"_class" : "x"
}
您应该可以在数据框上执行类似的操作
val aggDf = toDf
.groupBy("name")
.agg(count("name"), max("description"))
现在在新数据框aggDf
上,您可以执行aggDf.toJson
并得到结果。如果列与输出不匹配,则可以使用withColumnRenamed