将流媒体流的WebClient帖子拆分为JSON阵列

问题描述 投票:1回答:1

我正在使用第三方REST控制器,它接受一个JSON对象数组并返回单个对象响应。当我使用有限的Flux从WebClient POST时,代码可以工作(我假设,因为Flux完成了)。

但是,当Flux可能无限制时,我该如何;

  1. 以数组的形式发布POST?
  2. 根据POSTed数组捕获响应?
  3. 停止传输Flux

这是我的豆子;

public class Car implements Serializable {

    Long id;

    public Car() {}
    public Car(Long id) { this.id = id; }
    public Long getId() {return id; }
    public void setId(Long id) { this.id = id; }
}

这就是我假设第三方客户端的样子;

@RestController
public class ThirdPartyServer {

    @PostMapping("/cars")
    public CarResponse doCars(@RequestBody List<Car> cars) {
        System.err.println("Got " + cars);
        return new CarResponse("OK");
    }
}

这是我的代码。当我发布flux2时,完成后会发送一个JSON数组。然而,当我发布flux1时,在第一个take(5)之后没有发送任何内容。如何POST下5个下一个块?

@Component
public class MyCarClient {

    public void sendCars() {

//      Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));
        Flux<Car> flux2 = Flux.range(1, 10).map(i -> new Car((long) i));

        WebClient client = WebClient.create("http://localhost:8080");
        client
            .post()
            .uri("/cars")
            .contentType(MediaType.APPLICATION_JSON)
            .body(flux2, Car.class) 
//          .body(flux1.take(5).collectList(), new ParameterizedTypeReference<List<Car>>() {})
            .exchange()
            .subscribe(r -> System.err.println(r.statusCode()));
    }
}
java spring-webflux project-reactor reactive-streams
1个回答
3
投票
  1. 如何在阵列中进行POST?

使用Flux.window的一个变体将主要通量分成窗口通量,然后通过.flatMap使用窗口通量发送请求

        Flux<Car> flux1 = Flux.interval(Duration.ofMillis(250)).map(i -> new Car(i));

        WebClient client = WebClient.create("http://localhost:8080");
        Disposable disposable = flux1
                // 1
                .window(5)
                .flatMap(windowedFlux -> client
                        .post()
                        .uri("/cars")
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(windowedFlux, Car.class)
                        .exchange()
                        // 2
                        .doOnNext(response -> System.out.println(response.statusCode()))
                        .flatMap(response -> response.bodyToMono(...)))
                .subscribe();

        Thread.sleep(10000);

        // 3
        disposable.dispose();

  1. 如何捕获每个POSTed数组的响应?

您可以在.exchange()之后通过运算符分析响应。

在我提供的示例中,响应可以在doOnNext运算符中看到,但是您可以使用任何运算onNext信号的运算符,例如maphandle

确保完全读取响应主体以确保连接返回到池中(请参阅note)。在这里,我使用了.bodyToMono,但任何.body.toEntity方法都可以使用。

  1. 停止Flux的传输?

当您使用subscribe方法时,可以使用返回的disposable.dispose()停止流程。

或者,您可以从sendCars()方法返回Flux并委派订阅并处理给调用者。

请注意,在我提供的示例中,我只使用Thread.sleep()来模拟等待。在实际应用程序中,您应该使用更高级的东西,并避免使用Thread.sleep()

热门问题
推荐问题
最新问题