在quarkus中使用mutinity解码base64

问题描述 投票:0回答:1

我正在研究 quarkus 3 的 Mutiny 框架,我想知道这些代码是否正确,或者有人可能比我更好。我想读取一个大的 base64 字符串并将它们解码为一个字符串以保存在文件中(将来可能是一个文件)。我想一次读取 4 个字节(因为编码是 Base64 ),但以最好的多线程方式。 这是正确的代码还是错误的?如何?谢谢!

  @Inject
    ManagedExecutor managedExecutor;

 List<byte[]> resultDecoded = readBase64InStreamingFashionAndWriteTo(resultEncoded).collect().asList().await().indefinitely();
        //trasformao byte
        String resultMerge = resultDecoded.stream().map(b -> new String(b, StandardCharsets.UTF_8)).reduce("", (resultString, partialStr) -> resultString + partialStr);
        logger.info("Result Decoded: " + resultMerge);

public Multi<byte[]> readBase64InStreamingFashionAndWriteTo(String base64Str) throws IOException {
        logger.info("start thread read " + Thread.currentThread());
        AtomicInteger i = new AtomicInteger(1);
        return Uni.createFrom().item(base64Str)
               .emitOn(managedExecutor) //work pool
                .log()
               .onItem().transform(String::getBytes)
               .onItem().transform(ByteArrayInputStream::new)
               .onItem().transformToMulti(this::readChunkAsync)// processo asincrono
                .emitOn(managedExecutor)
                .onItem().transform( str -> {
                   if (i.getAndAdd(1) <= 2) logger.info(Thread.currentThread() + "**read partial string**: " + str);
                   return  str;})
               .onItem().transform(DatatypeConverter::parseBase64Binary);


    }

    private Multi<String> readChunkAsync(ByteArrayInputStream in) {
        return Multi.createFrom().emitter( em -> {  //emetto multi di stringhe (stream di stringhe generate leggendo 4 byte alla volta dal base64 in input)
                  try {
                      AtomicInteger i = new AtomicInteger(1);
                      byte[] buf = new byte[4];//4 bytes
                      int nread = -1;
                      while ((nread = in.read(buf, 0, buf.length)) != -1) {
                          if (i.getAndAdd(1) <= 2)  logger.info(Thread.currentThread() + "**emiting**: ");
                          em.emit(new String(buf, StandardCharsets.UTF_8));
                      }
                  }catch(Exception ex){
                      //
                  } finally {
                      em.complete();
                  }
                }
        );
    }
base64 quarkus java-11 mutiny quarkus-reactive
1个回答
0
投票

这可行,但由于任何操作中都没有异步 I/O(请参阅

onItem().transform(...)
,因此命令式循环中的相同代码将比那些运算符链接更有效。

© www.soinside.com 2019 - 2024. All rights reserved.