Webflux 使用 zip 传输 DataBuffer 导致文件损坏

问题描述 投票:0回答:4

目前我正在使用 Spring Boot 2.7.12 和 Webflux 实现一个非阻塞 I/O 应用程序,以使用 Web 客户端下载文件,压缩它们并将 zip 文件流式传输到浏览器。 如果我将 ZipOutputStream 写入本地文件,下载和压缩工作正常。 但是,如果我将 zip 流式传输回调用者 (Flux),它就会损坏。

我不确定,是否我误解了DataBuffer的概念,或者它是否是spring框架中的一个bug。

我创建了一个小样本。如果您下载 zip 文件,每个条目都会重复多次,并且最后一个条目已损坏。

谢谢你 罗伯托

@GetMapping(value = "/zip", produces = "application/zip")
  public Flux<DefaultDataBuffer> zip() {
    var files = Arrays.asList("File1", "File2", "File3", "File4", "File5");
    var responseDataBuffer = new DefaultDataBufferFactory().allocateBuffer();
    ZipOutputStream zipOutputStream = new ZipOutputStream(responseDataBuffer.asOutputStream());
    return Flux.fromStream(files.stream())
        .map(file -> putZipEntry(file, zipOutputStream))
        .map(x -> responseDataBuffer)
        .doOnComplete(() -> closeZipOutputStream(zipOutputStream));
  }

  private void closeZipOutputStream(ZipOutputStream zipOutputStream) {
    try {
    zipOutputStream.close();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  private ZipOutputStream putZipEntry(String file, ZipOutputStream zipOutputStream) {
    try {
      zipOutputStream.putNextEntry(new ZipEntry(file + ".txt"));
      zipOutputStream.write(file.getBytes());
      zipOutputStream.closeEntry();
      return zipOutputStream;
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }
spring spring-webflux nonblocking
4个回答
1
投票

为什么这么多废话?

private static final String ZIP_FILE = "UEsDBAoAAAAAAN1s5lYZ+CeZBgAAAAYAAAAJAAAAdGVzdDEudHh0VGVzdCAxUEsDBAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJAAAAdGVzdDIudHh0VGVzdCAyUEsDBAoAAAAAAPhs5lY1mSl3BgAAAAYAAAAJAAAAdGVzdDMudHh0VGVzdCAzUEsDBAoAAAAAAPts5laWDE3pBgAAAAYAAAAJAAAAdGVzdDQudHh0VGVzdCA0UEsDBAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJAAAAdGVzdDUudHh0VGVzdCA1UEsBAj8ACgAAAAAA3WzmVhn4J5kGAAAABgAAAAkAJAAAAAAAAAAgAAAAAAAAAHRlc3QxLnR4dAoAIAAAAAAAAQAYAN4h/3L+r9kBCLkhXwKw2QEkqm9t/q/ZAVBLAQI/AAoAAAAAAPVs5lajqS4ABgAAAAYAAAAJACQAAAAAAAAAIAAAAC0AAAB0ZXN0Mi50eHQKACAAAAAAAAEAGAD2Z+2M/q/ZAVPgIV8CsNkBbwSLdP6v2QFQSwECPwAKAAAAAAD4bOZWNZkpdwYAAAAGAAAACQAkAAAAAAAAACAAAABaAAAAdGVzdDMudHh0CgAgAAAAAAABABgAGsOxkP6v2QHcByJfArDZAW2Je3b+r9kBUEsBAj8ACgAAAAAA+2zmVpYMTekGAAAABgAAAAkAJAAAAAAAAAAgAAAAhwAAAHRlc3Q0LnR4dAoAIAAAAAAAAQAYAKkIkZT+r9kBWlUiXwKw2QGkwld4/q/ZAVBLAQI/AAoAAAAAAABt5lYAPEqeBgAAAAYAAAAJACQAAAAAAAAAIAAAALQAAAB0ZXN0NS50eHQKACAAAAAAAAEAGADtvU2Y/q/ZAZ/xIl8CsNkBEj5/d/6v2QFQSwUGAAAAAAUABQDHAQAA4QAAAAAA";

private static final int BUFFER_SIZE = 10;

@GetMapping(value = "/zip", produces = "application/zip")
public Flux<DataBuffer> test() {
    return DataBufferUtils.readInputStream(() -> new ByteArrayInputStream(Base64.getDecoder().decode(ZIP_FILE)), new DefaultDataBufferFactory(), BUFFER_SIZE);
}

0
投票

getFluxPublisherFunction
不正确,逻辑没有意义。它正在做的是

DataBufferUtils.write(源,outputStream)

source 是来自 ZIP_FILE 的 DataBuffer 的 Flux。 OutputStream 是另一个用单独的缓冲区(名为defaultDataBuffer)创建的数据缓冲区。 此方法返回与源中相同的缓冲区

.map(缓冲区 -> defaultDataBuffer)

对于每个缓冲区(ZIP_file 的块),它返回完整的估计缓冲区,该缓冲区正在使用写入操作进行填充。任何时候的目标缓冲区都可能包含 ZIP_FILE 的多个部分之一。

  1. 为什么代码可以使用 1000 个缓冲区,因为这足以容纳 ZIP_FILE 的全部内容。当订阅
    getRead
    时,本例中的
    DataBufferUtils.write(source, outputStream)
    方法返回单个 DataBuffer。当缓冲区大小很小时,这不起作用,因为您返回的是 defaultDataBuffer 中累积的部分块。
  2. 要将 zip 返回到浏览器,只需返回由
    getRead
    方法创建的具有不同缓冲区的 Flux 即可。它将在缓冲区大小为 10 或 100 或 1000 时工作。

0
投票

我不确定 Spring 如何处理这个问题,但据我了解,它不知道您何时完成写入,并且在写入第一个块后立即返回缓冲区。所以我想这也是接收者所看到的。在这种情况下,您需要的是仅在数据缓冲区完全写入时才返回数据缓冲区。

试试这个:

return Flux.fromStream(files.stream())
    .map(file -> putZipEntry(file, zipOutputStream))
    .then(Mono.fromCallable { 
        closeZipOutputStream(zipOutputStream);
        responseDataBuffer
   })

PS 在您的情况下,使用 Reactor 没有意义,因为您将其写入单个缓冲区并等待它完成。如果您能以块的形式产生响应,那就更好了,但我不知道如何使用标准

ZipOutputStream
来做到这一点。但是,由于它已经生成了
OutputStream
,因此按原样生成它会更容易(使用
InputStreamResource
ResponseEntity


0
投票

我最近遇到了一个类似的问题,想要从多个 URL 中压缩 Flux。问题在于 ZipOutputStream 是如何关闭的。它缺少文件末尾。因此腐败。

尝试在 zip 流上调用 finish() 并将最后几个字节添加到 Flux 中。

© www.soinside.com 2019 - 2024. All rights reserved.