我有两台服务器,带有 Spring boot(java) 的服务服务器和 FastAPI AI 服务器。 AI服务器有响应文本流的API(内容类型为“text/event-stream”)。工作流程如下。
我想知道有简单的方法来完成我想要的任务。
我首先尝试使用 Flux 响应。
@PostMapping(value = "/streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamTest() throws Exception {
WebClient client = WebClient.builder()
.baseUrl("http://localhost:8000/aichat_dummy")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
Flux<String> eventStream = client.post()
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(String.class);
return eventStream;
}
我还测试了 RestResponse。
@PostMapping(value = "/streaming", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public RestResponse<Flux<ServerSentEvent<String>>> streamTest() throws Exception {
ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<>() {};
WebClient client = WebClient.builder()
.baseUrl("http://192.168.0.130:8000/aichat_dummy")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
Flux<ServerSentEvent<String>> eventStream = client.post()
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(type);
HttpHeaders headers = HttpHeaders.writableHttpHeaders(HttpHeaders.EMPTY);
headers.add("Transfer-Encoding", "chunked");
headers.add("Content-Type", "text/event-stream; charset=utf-8");
return new ResponseEntity<>(eventStream, headers, HttpStatus.OK);
}
这些代码正常请求ai服务器。但无法响应流式传输到客户端。
您可以尝试使用
SseEmitter
退货。像这样:
@PostMapping("/test-stream")
public SseEmitter testStreamChat(@RequestBody @Validated Input input) throws Exception{
SseEmitter sseEmitter = new SseEmitter();
ChatRequest chatRequest = new ChatRequest(input.getInput(), null);
try (CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().build()){
SimpleHttpRequest simpleHttpRequest = SimpleRequestBuilder.post(streamChatUrl)
.setBody(objectMapper.writeValueAsString(chatRequest), ContentType.APPLICATION_JSON)
.build();
client.start();
Future<SimpleHttpResponse> future = client.execute(SimpleRequestProducer.create(simpleHttpRequest),
new AbstractCharResponseConsumer<>() {
@Override
protected void start(HttpResponse response, ContentType contentType) {
}
@Override
protected SimpleHttpResponse buildResult() {
return null;
}
@Override
public void releaseResources() {
}
@Override
protected int capacityIncrement() {
return 1;
}
@Override
protected void data(CharBuffer src, boolean endOfStream) {
while (src.hasRemaining()) {
char c = src.get();
try {
sseEmitter.send(SseEmitter.event().data(c));
} catch (IOException ioException) {
log.error("sse send error", ioException);
}
}
if (endOfStream) {
try {
sseEmitter.send(SseEmitter.event().data("[DONE]"));
} catch (IOException ioException) {
log.error("sse send error", ioException);
}finally {
sseEmitter.complete();
}
}
}
}, null);
future.get();
}
return sseEmitter;
}