如何使用Reactor Flux处理CSV文件并以JSON输出

问题描述 投票:1回答:1

我有一个要使用Spring Reactor Flux处理的CSV文件。

给出一个带有标题的CSV文件,其中前两列固定,可以有一个以上的可选数据列]

Id, Name, Group, Status
6EF3C06E-6240-1A4A-17D6-27E73F0CDD31, Harlan Ferguson, xy1, true
6B261437-217C-0FDF-741A-92477EE354EC, Risa Greene, xy2, false
4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1, Samson Blanchard, xy3, false
562C3486-E009-2C2D-9D3E-14355DB7D4D7, Damian Carson, xy4, true
...
...
... 

我想使用Flux处理输入这样输出是

[{
    "Id": "6EF3C06E-6240-1A4A-17D6-27E73F0CDD31",
    "Name": "Harlan Ferguson",
    "data": {
        "Group": "xyz1",
        "Status": true
    }
}, {
    "Id": "6B261437-217C-0FDF-741A-92477EE354EC",
    "Name": "Risa Greene",
    "data": {
        "Group": "xy2",
        "Status": false
    }
}, {
    "Id": "4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1",
    "Name": "Samson Blanchard",
    "data": {
        "Group": "xy3",
        "Status": false
    }
}, {
    "Id": "562C3486-E009-2C2D-9D3E-14355DB7D4D7",
    "Name": "Damian Carson",
    "data": {
        "Group": "xy4",
        "Status": true
    }
}]

我正在使用CSVReader进行流式传输和创建,并使用Flux使用

new CSVReader( Files.newBufferedReader(file) );
Flux<String[]> fluxOfCsvRecords = Flux.fromIterable(reader);

几年后我将回到Spring Reactor,所以我的理解有点生疏。

使用创建标题的单声道

Mono<String[]> headerMono = fluxOfCsvRecords.next();

然后,

fluxOfCsvRecords.skip(1)
  .flatMap(csvRecord -> headerMono.map(header -> header[0] + " : " + csvRecord[0]))
.subscribe(System.out::println);

这是中途代码,目的只是为了测试我是否能够组合标题和其余通量中的数据,期望看到

Id : 6EF3C06E-6240-1A4A-17D6-27E73F0CDD31
Id : 6B261437-217C-0FDF-741A-92477EE354EC
Id : 4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1
Id : 562C3486-E009-2C2D-9D3E-14355DB7D4D7

但是我的输出只是

4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1 : 6EF3C06E-6240-1A4A-17D6-27E73F0CDD31

如果有人可以帮助我了解如何实现这一目标,我将不胜感激。

---------------------------更新-------------------- -

尝试另一种方法

Flux<String[]> take1 = fluxOfCsvRecords.take(1);
take1.flatMap(header -> fluxOfCsvRecords.map(csvRecord -> header[0] + " : " + csvRecord[0]))
.subscribe(System.out::println);

输出为

Id : 6B261437-217C-0FDF-741A-92477EE354EC
Id : 4FADC070-FCD0-C7E8-1963-A7FACDB6D8D1
Id : 562C3486-E009-2C2D-9D3E-14355DB7D4D7

缺少标题后的行

spring flux reactor
1个回答
0
投票
public class TopJson { private int Id; private String name; private InnerJson data; public TopJson() {} public TopJson(int id, String name, InnerJson data) { super(); Id = id; this.name = name; this.data = data; } } class InnerJson{ private String group; private String status; public InnerJson() {} public InnerJson(String group, String status) { super(); this.group = group; this.status = status; }

转换为适当的类型并用于创建对象。

fluxOfCsvRecords.skip(1) .map((Function<String, TopJson>) x -> { String[] csvRecord = line.split(",");// a CSV has comma separated lines return new TopJson(Integer.parseInt(csvRecord[0]), csvRecord[1], new InnerJson(csvRecord[2], csvRecord[3])); }).collect(Collectors.toList()));
© www.soinside.com 2019 - 2024. All rights reserved.