Spring - 反应式代码 - 更改不传播

问题描述 投票:0回答:1

我正在尝试了解反应式流的传播及其运作方式

所以用例可以说我有一个预订列表,我按年龄分组并按年龄处理预订列表。稍后我使用collectList按年龄分组非常重要。我尝试了 Flux 的 group 和collectMAP,但两者都不适合我。所以我做了collectList,然后使用stream进行分组

现在,对于每个预订集(按年龄分组),我尝试单独处理预订(这不是实际的代码,但为了简化,我做了这样的示例)。我处理预订,然后在处理完所有预订后返回 Mono.empty。

这是代码

@GetMapping("/test")
 public Mono<Void> test() {
   return prepareForBooking();
 }

 @Data
 public class  Booking {
   String name;
   String age;
 }
 private Mono<Void> prepareForBooking() {
   System.out.println("Booking::1");
   Booking booking = new Booking();
   booking.setName("San");
   booking.setAge("18");

   Booking booking1 = new Booking();
   booking1.setName("Man");
   booking1.setAge("19");

   Booking booking2 = new Booking();
   booking2.setName("Dan");
   booking2.setAge("18");

   Booking booking3 = new Booking();
   booking3.setName("Can");
   booking3.setAge("17");

   List<Booking> bookings = new ArrayList<>();
   bookings.add(booking);
   bookings.add(booking1);
   bookings.add(booking2);
   bookings.add(booking3);

   return Flux.fromIterable(bookings)
           .collectList()
           .map(
                   bookingList -> {
                     var bookingsByAge =
                             bookingList.stream()
                                     .collect(Collectors.groupingBy(Booking::getAge));

                     return bookingsByAge.keySet().stream()
                             .map(
                                     age -> {
                                       if (age.equals("18") || age.equals("17")) {
                                         return processBookings(bookingsByAge.get(age));
                                       } else {
                                         return Mono.empty();
                                       }
                                     });
                   })
           .flatMapMany(Flux::fromStream)
           .then();
 }

 private Mono<Void> processBookings(List<Booking> bookings) {

   System.out.println("Booking::2" + bookings);
   return Flux.fromIterable(bookings).flatMap(booking -> this.processBooking(booking)).collectList().flatMap(bookingNames -> {
     System.out.println("Booking::4" + bookingNames);
     return Mono.empty();
   });
 }

 private Mono<String> processBooking(Booking booking) {
   System.out.println("Booking::3" + booking.getName());
   return Mono.just(booking.getName());
 }

我得到的输出

Booking::1
Booking::2[ProviderIntegrationBookingController.Booking(name=Can, age=17)]
Booking::2[ProviderIntegrationBookingController.Booking(name=San, age=18), ProviderIntegrationBookingController.Booking(name=Dan, age=18)]

我所期待的是

Booking::1
Booking::2[ProviderIntegrationBookingController.Booking(name=Can, age=17)]
Booking::2[ProviderIntegrationBookingController.Booking(name=San, age=18), ProviderIntegrationBookingController.Booking(name=Dan, age=18)]
Booking::3 San
Booking::3 Dan
Booking::3 Can
Booking::4 [Can]
Booking::4 [San, Dan]

所以我不确定如何使 processBookings 中的代码在 print 语句后工作,并且我不想订阅(因为这解决了问题,但我认为这不是正确的方法)。我期待链接能够发挥作用

我的疑问是在collectList上,我必须在其中制作年龄和预订的地图

spring spring-webflux
1个回答
0
投票

很少有事情让这本书难以阅读:

  • 您正在编写的代码似乎不起作用,反而似乎会产生许多副作用;这真的是你想要的吗?
  • 然后
    .then()
    在你的管道的末尾完全掩盖了你在使用泛型时最有帮助的朋友之一:对类型的编译检查

根据我对 WebFlux 和泛型的很少的经验,我建议你首先编写返回实际键入结果的代码,这样你就可以确保你有一个有效的订阅链。然后,最终,如果您想在某个时刻屏蔽结果,请通过将其映射到其他内容来将该结果搞砸。

这是我会做的,而不是产生副作用,为您的每个预订请求返回结果:

免责声明:我相当肯定您可以找到此代码的更好、更清晰的版本,我对 WebFlux 不太熟悉。

  @GetMapping("/test")
  public Mono<List<BookingResult>> test() {
    return prepareForBooking();
  }

  public record Booking(String name, String age) {

  }

  public record BookingResult(Booking booking, Status status) {

    public enum Status {
      SUCCESS,
      FAILURE,
      ;
    }

  }

  private Mono<List<BookingResult>> prepareForBooking() {
    System.out.println("Booking::1");

    Booking booking0 = new Booking("San", "18");
    Booking booking1 = new Booking("Man", "19");
    Booking booking2 = new Booking("Dan", "18");
    Booking booking3 = new Booking("Can", "17");

    List<Booking> bookings = new ArrayList<>();
    bookings.add(booking0);
    bookings.add(booking1);
    bookings.add(booking2);
    bookings.add(booking3);

    return Flux.fromIterable(bookings)
               .collectList()
               .map(bookingList -> bookingList.stream().collect(Collectors.groupingBy(Booking::age)))
               .flatMap(bookingsByAge -> bookingsByAge.entrySet()
                                                      .stream()
                                                      .map(this::bookForValidCustomers)
                                                      .reduce(Flux.<BookingResult>empty(),
                                                              (listMono, listMono2) -> listMono.mergeWith(Flux.fromIterable(listMono2)
                                                                                                              .flatMap(Function.identity())),
                                                              Flux::mergeWith)
                                                      .collectList());
  }

  private List<Mono<BookingResult>> bookForValidCustomers(final Entry<String, List<Booking>> entry) {
    if (entry.getKey().equals("18") || entry.getKey().equals("17")) {
      return processBookings(entry.getValue());
    } else {
      return failBookings(entry.getValue());
    }
  }

  private List<Mono<BookingResult>> processBookings(List<Booking> bookings) {
    System.out.println("SuccessPath::2" + bookings);
    return bookings.stream()
                   .map(this::processBooking)
                   .toList();
  }

  private Mono<BookingResult> processBooking(Booking booking) {
    System.out.println("SuccessPath::3" + booking.name());
    return Mono.just(new BookingResult(booking, Status.SUCCESS));
  }

  private List<Mono<BookingResult>> failBookings(List<Booking> bookings) {
    System.out.println("FailurePath::2" + bookings);
    return bookings.stream()
                   .map(this::failBooking)
                   .toList();
  }

  private Mono<BookingResult> failBooking(Booking booking) {
    System.out.println("FailurePath::3" + booking.name());
    return Mono.just(new BookingResult(booking, Status.FAILURE));
  }

它执行以下操作:

➜  ~ curl --silent -X GET --location "http://localhost:8080/test" | jq
[
  {
    "booking": {
      "name": "Can",
      "age": "17"
    },
    "status": "SUCCESS"
  },
  {
    "booking": {
      "name": "San",
      "age": "18"
    },
    "status": "SUCCESS"
  },
  {
    "booking": {
      "name": "Dan",
      "age": "18"
    },
    "status": "SUCCESS"
  },
  {
    "booking": {
      "name": "Man",
      "age": "19"
    },
    "status": "FAILURE"
  }
]

并产生以下副作用:

Booking::1
SuccessPath::2[Booking[name=Can, age=17]]
SuccessPath::3Can
SuccessPath::2[Booking[name=San, age=18], Booking[name=Dan, age=18]]
SuccessPath::3San
SuccessPath::3Dan
FailurePath::2[Booking[name=Man, age=19]]
FailurePath::3Man
© www.soinside.com 2019 - 2024. All rights reserved.