我正在尝试了解反应式流的传播及其运作方式
所以用例可以说我有一个预订列表,我按年龄分组并按年龄处理预订列表。稍后我使用collectList按年龄分组非常重要。我尝试了 Flux 的 group 和collectMAP,但两者都不适合我。所以我做了collectList,然后使用stream进行分组
现在,对于每个预订集(按年龄分组),我尝试单独处理预订(这不是实际的代码,但为了简化,我做了这样的示例)。我处理预订,然后在处理完所有预订后返回 Mono.empty。
这是代码
@GetMapping("/test")
public Mono<Void> test() {
return prepareForBooking();
}
@Data
public class Booking {
String name;
String age;
}
private Mono<Void> prepareForBooking() {
System.out.println("Booking::1");
Booking booking = new Booking();
booking.setName("San");
booking.setAge("18");
Booking booking1 = new Booking();
booking1.setName("Man");
booking1.setAge("19");
Booking booking2 = new Booking();
booking2.setName("Dan");
booking2.setAge("18");
Booking booking3 = new Booking();
booking3.setName("Can");
booking3.setAge("17");
List<Booking> bookings = new ArrayList<>();
bookings.add(booking);
bookings.add(booking1);
bookings.add(booking2);
bookings.add(booking3);
return Flux.fromIterable(bookings)
.collectList()
.map(
bookingList -> {
var bookingsByAge =
bookingList.stream()
.collect(Collectors.groupingBy(Booking::getAge));
return bookingsByAge.keySet().stream()
.map(
age -> {
if (age.equals("18") || age.equals("17")) {
return processBookings(bookingsByAge.get(age));
} else {
return Mono.empty();
}
});
})
.flatMapMany(Flux::fromStream)
.then();
}
private Mono<Void> processBookings(List<Booking> bookings) {
System.out.println("Booking::2" + bookings);
return Flux.fromIterable(bookings).flatMap(booking -> this.processBooking(booking)).collectList().flatMap(bookingNames -> {
System.out.println("Booking::4" + bookingNames);
return Mono.empty();
});
}
private Mono<String> processBooking(Booking booking) {
System.out.println("Booking::3" + booking.getName());
return Mono.just(booking.getName());
}
我得到的输出
Booking::1
Booking::2[ProviderIntegrationBookingController.Booking(name=Can, age=17)]
Booking::2[ProviderIntegrationBookingController.Booking(name=San, age=18), ProviderIntegrationBookingController.Booking(name=Dan, age=18)]
我所期待的是
Booking::1
Booking::2[ProviderIntegrationBookingController.Booking(name=Can, age=17)]
Booking::2[ProviderIntegrationBookingController.Booking(name=San, age=18), ProviderIntegrationBookingController.Booking(name=Dan, age=18)]
Booking::3 San
Booking::3 Dan
Booking::3 Can
Booking::4 [Can]
Booking::4 [San, Dan]
所以我不确定如何使 processBookings 中的代码在 print 语句后工作,并且我不想订阅(因为这解决了问题,但我认为这不是正确的方法)。我期待链接能够发挥作用
我的疑问是在collectList上,我必须在其中制作年龄和预订的地图
很少有事情让这本书难以阅读:
.then()
在你的管道的末尾完全掩盖了你在使用泛型时最有帮助的朋友之一:对类型的编译检查根据我对 WebFlux 和泛型的很少的经验,我建议你首先编写返回实际键入结果的代码,这样你就可以确保你有一个有效的订阅链。然后,最终,如果您想在某个时刻屏蔽结果,请通过将其映射到其他内容来将该结果搞砸。
这是我会做的,而不是产生副作用,为您的每个预订请求返回结果:
免责声明:我相当肯定您可以找到此代码的更好、更清晰的版本,我对 WebFlux 不太熟悉。
@GetMapping("/test")
public Mono<List<BookingResult>> test() {
return prepareForBooking();
}
public record Booking(String name, String age) {
}
public record BookingResult(Booking booking, Status status) {
public enum Status {
SUCCESS,
FAILURE,
;
}
}
private Mono<List<BookingResult>> prepareForBooking() {
System.out.println("Booking::1");
Booking booking0 = new Booking("San", "18");
Booking booking1 = new Booking("Man", "19");
Booking booking2 = new Booking("Dan", "18");
Booking booking3 = new Booking("Can", "17");
List<Booking> bookings = new ArrayList<>();
bookings.add(booking0);
bookings.add(booking1);
bookings.add(booking2);
bookings.add(booking3);
return Flux.fromIterable(bookings)
.collectList()
.map(bookingList -> bookingList.stream().collect(Collectors.groupingBy(Booking::age)))
.flatMap(bookingsByAge -> bookingsByAge.entrySet()
.stream()
.map(this::bookForValidCustomers)
.reduce(Flux.<BookingResult>empty(),
(listMono, listMono2) -> listMono.mergeWith(Flux.fromIterable(listMono2)
.flatMap(Function.identity())),
Flux::mergeWith)
.collectList());
}
private List<Mono<BookingResult>> bookForValidCustomers(final Entry<String, List<Booking>> entry) {
if (entry.getKey().equals("18") || entry.getKey().equals("17")) {
return processBookings(entry.getValue());
} else {
return failBookings(entry.getValue());
}
}
private List<Mono<BookingResult>> processBookings(List<Booking> bookings) {
System.out.println("SuccessPath::2" + bookings);
return bookings.stream()
.map(this::processBooking)
.toList();
}
private Mono<BookingResult> processBooking(Booking booking) {
System.out.println("SuccessPath::3" + booking.name());
return Mono.just(new BookingResult(booking, Status.SUCCESS));
}
private List<Mono<BookingResult>> failBookings(List<Booking> bookings) {
System.out.println("FailurePath::2" + bookings);
return bookings.stream()
.map(this::failBooking)
.toList();
}
private Mono<BookingResult> failBooking(Booking booking) {
System.out.println("FailurePath::3" + booking.name());
return Mono.just(new BookingResult(booking, Status.FAILURE));
}
它执行以下操作:
➜ ~ curl --silent -X GET --location "http://localhost:8080/test" | jq
[
{
"booking": {
"name": "Can",
"age": "17"
},
"status": "SUCCESS"
},
{
"booking": {
"name": "San",
"age": "18"
},
"status": "SUCCESS"
},
{
"booking": {
"name": "Dan",
"age": "18"
},
"status": "SUCCESS"
},
{
"booking": {
"name": "Man",
"age": "19"
},
"status": "FAILURE"
}
]
并产生以下副作用:
Booking::1
SuccessPath::2[Booking[name=Can, age=17]]
SuccessPath::3Can
SuccessPath::2[Booking[name=San, age=18], Booking[name=Dan, age=18]]
SuccessPath::3San
SuccessPath::3Dan
FailurePath::2[Booking[name=Man, age=19]]
FailurePath::3Man