如何将java.util.stream.Stream写成StreamingResponseBody输出流

问题描述 投票:1回答:2

我正在构建一个REST API,在这个API中,Oracle数据库中的大量数据可以通过流式传输的方式分块发送给客户端应用程序(像文件下载或直接流式传输)。

我从JpaRepository中得到的流如下--------------------------------。

@Query("select u from UsersEntity u")
Stream<UsersEntity> findAllByCustomQueryAndStream();

但现在的挑战是如何将这个流写到StreamingResponseBody输出流。

我尝试了很多方法,但都没有成功

第一方针 -

Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();

        StreamingResponseBody stream = outputStream -> {
            Iterator<UsersEntity> iterator = usersResultStream.iterator();

            try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {

                while (iterator.hasNext()) {
                    oos.write(iterator.next().toString().getBytes());
                }
            }
        };

出现错误 -

java.sql.SQLException: Closed Resultset: next
    at oracle.jdbc.driver.InsensitiveScrollableResultSet.next(InsensitiveScrollableResultSet.java:565) ~[ojdbc7-12.1.0.2.jar:12.1.0.2.0]

第二种方法 -

StreamingResponseBody stream = new StreamingResponseBody() {

            @Transactional(readOnly = true)
            @Override
            public void writeTo(OutputStream outputStream) throws IOException {

                Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();

                try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {

                    usersResultStream.forEach(user->{
                        try {
                            oos.write(user.toString().getBytes());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });
                }
            }
        }; 

出现错误 -

org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.

我已经在下面给出的链接中上传了练习代码--。POC链接样本

我没有任何经验与流相关的任务,所以请帮助我这个。

如果我在错误的方向比建议任何其他方法来做这个内 Spring框架. 如果有的话,请分享任何参考链接。

java spring-boot jpa-2.0
2个回答
1
投票

没有样本显示 "如此复杂 "的用法 StreamingResponseBody我担心它 "不可能"(至少我不能用StreamingResponseBody来解决它。 流查询)

.但是,什么是可能的。

  1. 使用... findAll() 在StreamingResponseBody中(正常的非流式List-repo方法)。

    但我理解 "需要 "异步完成web请求......而db请求是 "流 "的......)。

  2. 使用 Callable (异步网络请求)和一个 @Async CompletableFuture<..> (async db request):

    @RestController
    @RequestMapping("/api")
    public class UsersController {
    
       @Autowired
       private UsersRepository usersRepository;
    
       @GetMapping(value = "/async/users")
       public Callable<List<UsersEntity>> fetchUsersAsync() {
           Callable callable = () -> {
               return usersRepository.readAllBy().get();
           };
           return callable;
       }
    }
    

    ...还有一个像这样的存储库:

    @Repository
    public interface UsersRepository extends JpaRepository<UsersEntity, Integer> {
    
        @Async
        CompletableFuture<List<UsersEntity>> readAllBy();
    }
    

    (见 弹簧样品) .. 不要忘记 @EnableAsync 在你的应用程序配置上。

    @org.springframework.scheduling.annotation.EnableAsync
    @SpringBootApplication
    public class Application { ... }
    

对不起,这甚至不是一个答案, 但我的发现 - 太长了,无法评论。

异步的web请求可以通过各种方式实现。(见 https:/spring.ioblog20120510spring-mvc-3-2-preview making-a-controller-method-asynchronous。, https:/niels.nublog2016spring-async-rest.html。,甚至没有提到 "反应式 "api)


1
投票

最后,我通过使用服务层解决了这个问题。最初,我是在Controller Class中编写完整的逻辑,这就造成了问题。

控制器类 -

@RestController
@RequestMapping("/api")
public class UsersController {
    @Autowired
    private UserService service;

    @GetMapping(value = "/userstream")
    public ResponseEntity<StreamingResponseBody> fetchUsersStream() {

        StreamingResponseBody stream = this::writeTo;

        return new ResponseEntity<>(stream, HttpStatus.OK);
    }

    private void writeTo(OutputStream outputStream) {
        service.writeToOutputStream(outputStream);
    }
}

服务类 -

@Service
public class UserService {

    @Autowired
    private UsersRepository usersRepository;

    @Transactional(readOnly = true)
    public void writeToOutputStream(final OutputStream outputStream) {
        try (Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream()) {
            try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {

                usersResultStream.forEach(emp -> {
                    try {
                        oos.write(emp.toString().getBytes());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

完整的代码可以在github上找到 - https:/github.combagesh2050HttpResponseStreamingDemo。

不过,我还是愿意接受与Http流相关的建议。如果你有更好的想法,请提供。

© www.soinside.com 2019 - 2024. All rights reserved.