Spring 服务服务器接收来自 FastAPI AI 服务器的流式响应并将其转发到前端

问题描述 投票:0回答:1

我有两台服务器,带有 Spring boot(java) 的服务服务器和 FastAPI AI 服务器。 AI服务器有响应文本流的API(内容类型为“text/event-stream”)。工作流程如下。

  1. 客户端向 Spring 服务器发送 post 请求,spring 服务器检查身份验证并为 AI 服务器创建 post 请求正文。
  2. Spring 服务器向 AI 服务器发送 post 请求并获取文本流响应。
  3. Spring 服务器对客户端的响应与 AI 服务器的响应相同(文本流)。

我想知道有简单的方法来完成我想要的任务。

我首先尝试使用 Flux 响应。

@PostMapping(value = "/streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamTest() throws Exception {
        WebClient client = WebClient.builder()
                .baseUrl("http://localhost:8000/aichat_dummy")
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
        Flux<String> eventStream = client.post()
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(String.class);
        return eventStream;
    }

我还测试了 RestResponse。

@PostMapping(value = "/streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public RestResponse<Flux<ServerSentEvent<String>>> streamTest() throws Exception {
        ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<>() {};
        WebClient client = WebClient.builder()
                .baseUrl("http://192.168.0.130:8000/aichat_dummy")
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();

        Flux<ServerSentEvent<String>> eventStream = client.post()
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(type);


        HttpHeaders headers = HttpHeaders.writableHttpHeaders(HttpHeaders.EMPTY);
        headers.add("Transfer-Encoding", "chunked");
        headers.add("Content-Type", "text/event-stream; charset=utf-8");

        return new ResponseEntity<>(eventStream, headers, HttpStatus.OK);
    }

这些代码正常请求ai服务器。但无法响应流式传输到客户端。

spring-boot http streaming
1个回答
0
投票

您可以尝试使用

SseEmitter
退货。像这样:

@PostMapping("/test-stream")
public SseEmitter testStreamChat(@RequestBody @Validated Input input) throws Exception{
    SseEmitter sseEmitter = new SseEmitter();
    ChatRequest chatRequest = new ChatRequest(input.getInput(), null);
    try (CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().build()){
        SimpleHttpRequest simpleHttpRequest = SimpleRequestBuilder.post(streamChatUrl)
                .setBody(objectMapper.writeValueAsString(chatRequest), ContentType.APPLICATION_JSON)
                .build();
        client.start();
        Future<SimpleHttpResponse> future = client.execute(SimpleRequestProducer.create(simpleHttpRequest),
                new AbstractCharResponseConsumer<>() {

                    @Override
                    protected void start(HttpResponse response, ContentType contentType) {

                    }

                    @Override
                    protected SimpleHttpResponse buildResult() {
                        return null;
                    }

                    @Override
                    public void releaseResources() {

                    }

                    @Override
                    protected int capacityIncrement() {
                        return 1;
                    }

                    @Override
                    protected void data(CharBuffer src, boolean endOfStream) {
                        while (src.hasRemaining()) {
                            char c = src.get();
                            try {
                                sseEmitter.send(SseEmitter.event().data(c));
                            } catch (IOException ioException) {
                                log.error("sse send error", ioException);
                            }
                        }
                        if (endOfStream) {
                            try {
                                sseEmitter.send(SseEmitter.event().data("[DONE]"));
                            } catch (IOException ioException) {
                                log.error("sse send error", ioException);
                            }finally {
                                sseEmitter.complete();
                            }
                        }
                    }

                }, null);
        future.get();
    }
    return sseEmitter;
}
© www.soinside.com 2019 - 2024. All rights reserved.