Java Spring,临时多部分文件何时被删除?

问题描述 投票:0回答:4

我有一个 Spring 应用程序,我有一个接受多个文件的端点,因此您可以上传多个文件。 我试图通过利用线程使其工作得更快,所以我所做的是创建一个 ThreadPool 并尝试异步处理每个文件(在不同的线程中)。 实际上,我在 youtube 上找到了一个教程,它几乎做了我想要的事情,我想尝试他的代码,看看它是否有效,但它对我来说也不起作用。 这是教程的链接。

https://www.youtube.com/watch?v=3rJBLFA95Io 我相信他不会收到错误,因为在他的情况下文件和线程的数量总是匹配的。 我发现,如果我的池中的任务数量等于或小于可用线程数量,那么一切都工作得很好,但是如果任务数量或大于可用线程数量,我会收到一个错误:

java.io.FileNotFoundException: /private/var/folders/41/81526n295q1cptcb1tbrs544h504m8/T/tomcat.9191.5294201821824312569/work/Tomcat/localhost/ROOT/upload_ff67c7fe_2f2f_44c6_8eb9_3250c8a8739b_00000003.tmp (No such file or directory)
    at java.base/java.io.FileInputStream.open0(Native Method) ~[na:na]
    at java.base/java.io.FileInputStream.open(FileInputStream.java:216) ~[na:na]
    at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) ~[na:na]
    at org.apache.tomcat.util.http.fileupload.disk.DiskFileItem.getInputStream(DiskFileItem.java:198) ~[tomcat-embed-core-9.0.63.jar:9.0.63]
    at org.apache.catalina.core.ApplicationPart.getInputStream(ApplicationPart.java:100) ~[tomcat-embed-core-9.0.63.jar:9.0.63]
    at org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile.getInputStream(StandardMultipartHttpServletRequest.java:251) ~[spring-web-5.3.20.jar:5.3.20]
    at com.mhndev.springexecutor.service.UserService.parseCSVFile(UserService.java:47) ~[classes/:na]
    at com.mhndev.springexecutor.service.UserService.saveUsers(UserService.java:29) ~[classes/:na]
    at com.mhndev.springexecutor.service.UserService$$FastClassBySpringCGLIB$$c14fedc2.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.3.20.jar:5.3.20]
    at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$doSubmit$3(AsyncExecutionAspectSupport.java:278) ~[spring-aop-5.3.20.jar:5.3.20]
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

所以我怀疑的是,当任务保留在池中时,多部分文件会在该任务出于某种原因拥有分配的线程之前被删除。

我的控制器:

package com.mhndev.springexecutor.controller;

import com.mhndev.springexecutor.entity.User;
import com.mhndev.springexecutor.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;

import java.util.List;
import java.util.concurrent.CompletableFuture;

@RestController
public class UserController {

    @Autowired
    private UserService userService;

    @PostMapping(value = "/users", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
    public ResponseEntity saveUsers(@RequestParam(value = "files") MultipartFile[] files) throws Exception {
        for(MultipartFile file: files) {
            userService.saveUsers(file);
        }

        return ResponseEntity.status(HttpStatus.CREATED).build();
    }

    @GetMapping(value = "/users", produces = "application/json")
    public CompletableFuture<ResponseEntity> findAllUsers() {
        return userService.findAllUsers().thenApply(ResponseEntity::ok);
    }

    @GetMapping(value = "/getUsersByThread", produces = "application/json")
    public  ResponseEntity getUsers(){
        CompletableFuture<List<User>> users1=userService.findAllUsers();
        CompletableFuture<List<User>> users2=userService.findAllUsers();
        CompletableFuture<List<User>> users3=userService.findAllUsers();
        CompletableFuture.allOf(users1,users2,users3).join();
        return ResponseEntity.status(HttpStatus.OK).build();
    }
}

还有我的服务:

package com.mhndev.springexecutor.service;

import com.mhndev.springexecutor.entity.User;
import com.mhndev.springexecutor.repository.UserRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Service
public class UserService {

    @Autowired
    private UserRepository repository;
    Logger logger = LoggerFactory.getLogger(UserService.class);

    @Async("taskExecutor")
    public CompletableFuture<List<User>> saveUsers(MultipartFile file) throws Exception {
        long start = System.currentTimeMillis();
        List<User> users = parseCSVFile(file);
        logger.info("saving list of users of size {}, thread name : {}", users.size(), "" + Thread.currentThread().getName());
        users = repository.saveAll(users);
        long end = System.currentTimeMillis();
        logger.info("Total time {}", (end - start));
        return CompletableFuture.completedFuture(users);
    }

    @Async("taskExecutor")
    public CompletableFuture<List<User>> findAllUsers(){
        logger.info("get list of user by " + Thread.currentThread().getName());
        List<User> users = repository.findAll();
        return CompletableFuture.completedFuture(users);
    }

    private List<User> parseCSVFile(final MultipartFile file) throws Exception {
        final List<User> users = new ArrayList<>();
        try {
            try (final BufferedReader br = new BufferedReader(new InputStreamReader(file.getInputStream()))) {
                String line;
                while ((line = br.readLine()) != null) {
                    final String[] data = line.split(",");
                    final User user = new User();
                    user.setName(data[0]);
                    user.setEmail(data[1]);
                    user.setGender(data[2]);
                    users.add(user);
                }
                return users;
            }
        } catch (final IOException e) {
            logger.error("Failed to parse CSV file", e);
            throw new Exception("Failed to parse CSV file {}", e);
        }
    }

}

以及线程池的配置:

package com.mhndev.springexecutor.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(3);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("User-Thread-");
        executor.initialize();

        return executor;
    }
}

java spring threadpool multipartform-data java-threads
4个回答
1
投票

我使用 CompletableFuture 解决了这个问题。 所以我改变了我的控制器如下:

    @PostMapping(value = "/users", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
    public ResponseEntity saveUsers(@RequestParam(value = "files") MultipartFile[] files) throws Exception {
         var res = new ArrayList<CompletableFuture<List<User>>>();
        for(MultipartFile file: files) {
            res.add(userService.saveUsers(file));
        }

        for(CompletableFuture<List<User>> item: res) {
            System.out.println(item.get().size());
        }

        return ResponseEntity.status(HttpStatus.CREATED).build();
    }

似乎请求的会话结束了,然后在请求的会话结束后删除了临时文件。 你知道我是怎么想出来的吗?我在控制器主体中添加了一个睡眠,它起作用了,所以我确定这是因为请求已经结束,但是线程池队列中有一些未完成的任务,并且控制器不会等待任务完成,看起来我必须使用 comptable future。


0
投票

我知道你发帖已经一年了,但这里有一个解决你问题的方法。可能有人经历过,却没有找到出路。

您可以将 MultipartFile 保存到目录中的文件中。这是代码:

public List<Path> saveFile(MultipartFile[] files) {

    List<Path> files = new ArrayList<>();

    for(MultipartFile file: files) {

        try {

            Path root = Paths.get(System.getProperty("java.io.tmpdir") + "/upload/");
            Files.createDirectories(root);
            Path filePath = this.root.resolve(""file.getOriginalFilename());

            Files.delete(filePath);
            Files.copy(file.getInputStream(), filePath);

            files.add(filePath)

        } catch (Exception e) {
            if (e instanceof FileAlreadyExistsException) {
                throw new RuntimeException("A file of that name already exists.");
            }

            throw new RuntimeException(e.getMessage());
        }
    }

    return files;
}

保存后,您可以使用保存的文件而不是 MultipartFile

@PostMapping(value = "/users", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
public ResponseEntity saveUsers(@RequestParam(value = "files") MultipartFile[] files) throws Exception {

    var res = new ArrayList<CompletableFuture<List<User>>>();
     
    List<Path> pathFiles = saveFile(files);
     
    for(Path file: pathFiles) {
        res.add(userService.saveUsers(file));
    }

    for(CompletableFuture<List<User>> item: res) {
        System.out.println(item.get().size());
    }

    return ResponseEntity.status(HttpStatus.CREATED).build();
}

0
投票

在 CompletableFuture 中使用任何 POJO 对象中的多部分字段时,会将文件存储在临时目录中,但访问该文件时,找不到临时目录,因此找不到异常文件。

解决方案:

在调用

HashMap<String,byte[]>
 之前实现了 
bytes[]
 并存储了 
CompletableFuture.supplyAsync(() - { //Business Logic Implementation })

希望这有帮助。


0
投票

我对这个问题的 2 美分。 @Majid@Giovani 的答案都有问题。

问题:在处理完所有文件后才会给出回复。

我认为这违背了异步的目的。

[我仍然不确定为什么不能从重用的线程中读取 MultipartFile。 (如果你发现了请告诉)]


我解决了上述问题:

  1. 从其余调用中读取多部分文件
  2. 在当前目录中保存临时文件副本
  3. 将文件 obj 给线程
  4. 线程将异步读取数据并将其存储在 h2 db 中
  5. 线程将删除临时文件
PS:

同样可以通过将MultiPart文件保存在缓存或数据库中来实现


日志:(注意“[INFO] - 完成处理 - 发送回响应”被调用,之后线程继续异步工作)

[INFO] - storing files [INFO] - -- saving file MOCK_DATA.csv [INFO] - -- saving file MOCK_DATA (1).csv [INFO] - -- saving file MOCK_DATA (2).csv [INFO] - userThread-1 > processing file MOCK_DATA.csv [INFO] - > parsing csv file MOCK_DATA.csv of size 62582 [INFO] - Done processing - sending response back [INFO] - userThread-2 > processing file MOCK_DATA (1).csv [INFO] - > parsing csv file MOCK_DATA (1).csv of size 62414 [INFO] - userThread-1 > saved all 1000 users [INFO] - userThread-2 > saved all 1000 users [INFO] - userThread-1 > total users : 2001 [INFO] - userThread-1 > deleting file : MOCK_DATA.csv [INFO] - >Thread is free userThread-1 [INFO] - userThread-1 > processing file MOCK_DATA (2).csv [INFO] - > parsing csv file MOCK_DATA (2).csv of size 62541 [INFO] - userThread-2 > total users : 2001 [INFO] - userThread-2 > deleting file : MOCK_DATA (1).csv [INFO] - >Thread is free userThread-2 [INFO] - userThread-1 > saved all 1000 users [INFO] - userThread-1 > total users : 3001 [INFO] - userThread-1 > deleting file : MOCK_DATA (2).csv [INFO] - >Thread is free userThread-1


控制器方法如下:

@PostMapping("/") public ResponseEntity<String> populateTable(@RequestParam MultipartFile files[]) throws Exception { log.info("storing files"); for (MultipartFile f : files) { log.info(" -- saving file {}", f.getOriginalFilename()); File tempFile = new File(f.getOriginalFilename()); f.transferTo(tempFile.toPath()); service.saveFile(tempFile); } log.info("Done processing - sending response back"); return ResponseEntity.status(HttpStatus.CREATED).body("Populated files"); }


以下是saveFile方法:

@Async public CompletableFuture<Integer> saveFile(final File file) throws Exception { String threadName = Thread.currentThread().getName(); log.info(threadName + " > processing file {}", file.getName()); List<User> users = parseCsvFile(file); userRepo.saveAll(users); log.info(threadName + " > saved all {} users", users.size()); log.info(threadName + " > total users : {}", getUserCount()); log.info(threadName + " > deleting file : {}", file.getName()); log.info(">Thread is free: {}", threadName); file.delete(); return CompletableFuture.completedFuture(users.size()); }


下面是parseCsvFile方法:

private List<User> parseCsvFile(final File file) throws Exception { final List<User> users = new ArrayList<>(); log.info(" > parsing csv file {} of size {} ", file.getName(), file.length()); try { try(final BufferedReader br = new BufferedReader(new FileReader(file))){ String line; br.readLine();//skipping first line while ((line = br.readLine()) != null) { final String data[] = line.split(","); final User u = new User(); u.setFirstName(data[1]); u.setLastName(data[2]); u.setEmail(data[3]); u.setGender(data[4]); u.setIp(data[5]); users.add(u); } } } catch (IOException e) { log.error(" >< something happened"); e.printStackTrace(); } catch (Exception e) { log.error(" >< something happened with user size {}", CollectionUtils.isEmpty(users) ? 0 : users.get(users.size() - 1)); e.printStackTrace(); } return users; }
    
© www.soinside.com 2019 - 2024. All rights reserved.