目前我正在使用 Spring Boot 2.7.12 和 Webflux 实现一个非阻塞 I/O 应用程序,以使用 Web 客户端下载文件,压缩它们并将 zip 文件流式传输到浏览器。 如果我将 ZipOutputStream 写入本地文件,下载和压缩工作正常。 但是,如果我将 zip 流式传输回调用者 (Flux),它就会损坏。
我不确定,是否我误解了DataBuffer的概念,或者它是否是spring框架中的一个bug。
我创建了一个小样本。如果您下载 zip 文件,每个条目都会重复多次,并且最后一个条目已损坏。
谢谢你 罗伯托
@GetMapping(value = "/zip", produces = "application/zip")
public Flux<DefaultDataBuffer> zip() {
var files = Arrays.asList("File1", "File2", "File3", "File4", "File5");
var responseDataBuffer = new DefaultDataBufferFactory().allocateBuffer();
ZipOutputStream zipOutputStream = new ZipOutputStream(responseDataBuffer.asOutputStream());
return Flux.fromStream(files.stream())
.map(file -> putZipEntry(file, zipOutputStream))
.map(x -> responseDataBuffer)
.doOnComplete(() -> closeZipOutputStream(zipOutputStream));
}
private void closeZipOutputStream(ZipOutputStream zipOutputStream) {
try {
zipOutputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
private ZipOutputStream putZipEntry(String file, ZipOutputStream zipOutputStream) {
try {
zipOutputStream.putNextEntry(new ZipEntry(file + ".txt"));
zipOutputStream.write(file.getBytes());
zipOutputStream.closeEntry();
return zipOutputStream;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
为什么这么多废话?
private static final String ZIP_FILE = "UEsDBAoAAAAAAN1s5lYZ+CeZBgAAAAYAAAAJAAAAdGVzdDEudHh0VGVzdCAxUEsDBAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJAAAAdGVzdDIudHh0VGVzdCAyUEsDBAoAAAAAAPhs5lY1mSl3BgAAAAYAAAAJAAAAdGVzdDMudHh0VGVzdCAzUEsDBAoAAAAAAPts5laWDE3pBgAAAAYAAAAJAAAAdGVzdDQudHh0VGVzdCA0UEsDBAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJAAAAdGVzdDUudHh0VGVzdCA1UEsBAj8ACgAAAAAA3WzmVhn4J5kGAAAABgAAAAkAJAAAAAAAAAAgAAAAAAAAAHRlc3QxLnR4dAoAIAAAAAAAAQAYAN4h/3L+r9kBCLkhXwKw2QEkqm9t/q/ZAVBLAQI/AAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJACQAAAAAAAAAIAAAAC0AAAB0ZXN0Mi50eHQKACAAAAAAAAEAGAD2Z+2M/q/ZAVPgIV8CsNkBbwSLdP6v2QFQSwECPwAKAAAAAAD4bOZWNZkpdwYAAAAGAAAACQAkAAAAAAAAACAAAABaAAAAdGVzdDMudHh0CgAgAAAAAAABABgAGsOxkP6v2QHcByJfArDZAW2Je3b+r9kBUEsBAj8ACgAAAAAA+2zmVpYMTekGAAAABgAAAAkAJAAAAAAAAAAgAAAAhwAAAHRlc3Q0LnR4dAoAIAAAAAAAAQAYAKkIkZT+r9kBWlUiXwKw2QGkwld4/q/ZAVBLAQI/AAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJACQAAAAAAAAAIAAAALQAAAB0ZXN0NS50eHQKACAAAAAAAAEAGADtvU2Y/q/ZAZ/xIl8CsNkBEj5/d/6v2QFQSwUGAAAAAAUABQDHAQAA4QAAAAAA";
private static final int BUFFER_SIZE = 10;
@GetMapping(value = "/zip", produces = "application/zip")
public Flux<DataBuffer> test() {
return DataBufferUtils.readInputStream(() -> new ByteArrayInputStream(Base64.getDecoder().decode(ZIP_FILE)), new DefaultDataBufferFactory(), BUFFER_SIZE);
}
getFluxPublisherFunction
不正确,逻辑没有意义。它正在做的是
DataBufferUtils.write(源,outputStream)
source 是来自 ZIP_FILE 的 DataBuffer 的 Flux。 OutputStream 是另一个用单独的缓冲区(名为defaultDataBuffer)创建的数据缓冲区。 此方法返回与源中相同的缓冲区
.map(缓冲区 -> defaultDataBuffer)
对于每个缓冲区(ZIP_file 的块),它返回完整的估计缓冲区,该缓冲区正在使用写入操作进行填充。任何时候的目标缓冲区都可能包含 ZIP_FILE 的多个部分之一。
getRead
时,本例中的 DataBufferUtils.write(source, outputStream)
方法返回单个 DataBuffer。当缓冲区大小很小时,这不起作用,因为您返回的是 defaultDataBuffer 中累积的部分块。getRead
方法创建的具有不同缓冲区的 Flux 即可。它将在缓冲区大小为 10 或 100 或 1000 时工作。我不确定 Spring 如何处理这个问题,但据我了解,它不知道您何时完成写入,并且在写入第一个块后立即返回缓冲区。所以我想这也是接收者所看到的。在这种情况下,您需要的是仅在数据缓冲区完全写入时才返回数据缓冲区。
试试这个:
return Flux.fromStream(files.stream())
.map(file -> putZipEntry(file, zipOutputStream))
.then(Mono.fromCallable {
closeZipOutputStream(zipOutputStream);
responseDataBuffer
})
PS 在您的情况下,使用 Reactor 没有意义,因为您将其写入单个缓冲区并等待它完成。如果您能以块的形式产生响应,那就更好了,但我不知道如何使用标准
ZipOutputStream
来做到这一点。但是,由于它已经生成了 OutputStream
,因此按原样生成它会更容易(使用 InputStreamResource
和 ResponseEntity
)
我最近遇到了一个类似的问题,想要从多个 URL 中压缩 Flux。问题在于 ZipOutputStream 是如何关闭的。它缺少文件末尾。因此腐败。
尝试在 zip 流上调用 finish() 并将最后几个字节添加到 Flux 中。