我有一个 Spring 应用程序,我有一个接受多个文件的端点,因此您可以上传多个文件。 我试图通过利用线程使其工作得更快,所以我所做的是创建一个 ThreadPool 并尝试异步处理每个文件(在不同的线程中)。 实际上,我在 youtube 上找到了一个教程,它几乎做了我想要的事情,我想尝试他的代码,看看它是否有效,但它对我来说也不起作用。 这是教程的链接。
https://www.youtube.com/watch?v=3rJBLFA95Io 我相信他不会收到错误,因为在他的情况下文件和线程的数量总是匹配的。 我发现,如果我的池中的任务数量等于或小于可用线程数量,那么一切都工作得很好,但是如果任务数量或大于可用线程数量,我会收到一个错误:
java.io.FileNotFoundException: /private/var/folders/41/81526n295q1cptcb1tbrs544h504m8/T/tomcat.9191.5294201821824312569/work/Tomcat/localhost/ROOT/upload_ff67c7fe_2f2f_44c6_8eb9_3250c8a8739b_00000003.tmp (No such file or directory)
at java.base/java.io.FileInputStream.open0(Native Method) ~[na:na]
at java.base/java.io.FileInputStream.open(FileInputStream.java:216) ~[na:na]
at java.base/java.io.FileInputStream.<init>(FileInputStream.java:157) ~[na:na]
at org.apache.tomcat.util.http.fileupload.disk.DiskFileItem.getInputStream(DiskFileItem.java:198) ~[tomcat-embed-core-9.0.63.jar:9.0.63]
at org.apache.catalina.core.ApplicationPart.getInputStream(ApplicationPart.java:100) ~[tomcat-embed-core-9.0.63.jar:9.0.63]
at org.springframework.web.multipart.support.StandardMultipartHttpServletRequest$StandardMultipartFile.getInputStream(StandardMultipartHttpServletRequest.java:251) ~[spring-web-5.3.20.jar:5.3.20]
at com.mhndev.springexecutor.service.UserService.parseCSVFile(UserService.java:47) ~[classes/:na]
at com.mhndev.springexecutor.service.UserService.saveUsers(UserService.java:29) ~[classes/:na]
at com.mhndev.springexecutor.service.UserService$$FastClassBySpringCGLIB$$c14fedc2.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.3.20.jar:5.3.20]
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$doSubmit$3(AsyncExecutionAspectSupport.java:278) ~[spring-aop-5.3.20.jar:5.3.20]
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
所以我怀疑的是,当任务保留在池中时,多部分文件会在该任务出于某种原因拥有分配的线程之前被删除。
我的控制器:
package com.mhndev.springexecutor.controller;
import com.mhndev.springexecutor.entity.User;
import com.mhndev.springexecutor.service.UserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.multipart.MultipartFile;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@RestController
public class UserController {
@Autowired
private UserService userService;
@PostMapping(value = "/users", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
public ResponseEntity saveUsers(@RequestParam(value = "files") MultipartFile[] files) throws Exception {
for(MultipartFile file: files) {
userService.saveUsers(file);
}
return ResponseEntity.status(HttpStatus.CREATED).build();
}
@GetMapping(value = "/users", produces = "application/json")
public CompletableFuture<ResponseEntity> findAllUsers() {
return userService.findAllUsers().thenApply(ResponseEntity::ok);
}
@GetMapping(value = "/getUsersByThread", produces = "application/json")
public ResponseEntity getUsers(){
CompletableFuture<List<User>> users1=userService.findAllUsers();
CompletableFuture<List<User>> users2=userService.findAllUsers();
CompletableFuture<List<User>> users3=userService.findAllUsers();
CompletableFuture.allOf(users1,users2,users3).join();
return ResponseEntity.status(HttpStatus.OK).build();
}
}
还有我的服务:
package com.mhndev.springexecutor.service;
import com.mhndev.springexecutor.entity.User;
import com.mhndev.springexecutor.repository.UserRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Service
public class UserService {
@Autowired
private UserRepository repository;
Logger logger = LoggerFactory.getLogger(UserService.class);
@Async("taskExecutor")
public CompletableFuture<List<User>> saveUsers(MultipartFile file) throws Exception {
long start = System.currentTimeMillis();
List<User> users = parseCSVFile(file);
logger.info("saving list of users of size {}, thread name : {}", users.size(), "" + Thread.currentThread().getName());
users = repository.saveAll(users);
long end = System.currentTimeMillis();
logger.info("Total time {}", (end - start));
return CompletableFuture.completedFuture(users);
}
@Async("taskExecutor")
public CompletableFuture<List<User>> findAllUsers(){
logger.info("get list of user by " + Thread.currentThread().getName());
List<User> users = repository.findAll();
return CompletableFuture.completedFuture(users);
}
private List<User> parseCSVFile(final MultipartFile file) throws Exception {
final List<User> users = new ArrayList<>();
try {
try (final BufferedReader br = new BufferedReader(new InputStreamReader(file.getInputStream()))) {
String line;
while ((line = br.readLine()) != null) {
final String[] data = line.split(",");
final User user = new User();
user.setName(data[0]);
user.setEmail(data[1]);
user.setGender(data[2]);
users.add(user);
}
return users;
}
} catch (final IOException e) {
logger.error("Failed to parse CSV file", e);
throw new Exception("Failed to parse CSV file {}", e);
}
}
}
以及线程池的配置:
package com.mhndev.springexecutor.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean(name = "taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(3);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("User-Thread-");
executor.initialize();
return executor;
}
}
我使用 CompletableFuture 解决了这个问题。 所以我改变了我的控制器如下:
@PostMapping(value = "/users", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
public ResponseEntity saveUsers(@RequestParam(value = "files") MultipartFile[] files) throws Exception {
var res = new ArrayList<CompletableFuture<List<User>>>();
for(MultipartFile file: files) {
res.add(userService.saveUsers(file));
}
for(CompletableFuture<List<User>> item: res) {
System.out.println(item.get().size());
}
return ResponseEntity.status(HttpStatus.CREATED).build();
}
似乎请求的会话结束了,然后在请求的会话结束后删除了临时文件。 你知道我是怎么想出来的吗?我在控制器主体中添加了一个睡眠,它起作用了,所以我确定这是因为请求已经结束,但是线程池队列中有一些未完成的任务,并且控制器不会等待任务完成,看起来我必须使用 comptable future。
我知道你发帖已经一年了,但这里有一个解决你问题的方法。可能有人经历过,却没有找到出路。
您可以将 MultipartFile 保存到目录中的文件中。这是代码:
public List<Path> saveFile(MultipartFile[] files) {
List<Path> files = new ArrayList<>();
for(MultipartFile file: files) {
try {
Path root = Paths.get(System.getProperty("java.io.tmpdir") + "/upload/");
Files.createDirectories(root);
Path filePath = this.root.resolve(""file.getOriginalFilename());
Files.delete(filePath);
Files.copy(file.getInputStream(), filePath);
files.add(filePath)
} catch (Exception e) {
if (e instanceof FileAlreadyExistsException) {
throw new RuntimeException("A file of that name already exists.");
}
throw new RuntimeException(e.getMessage());
}
}
return files;
}
保存后,您可以使用保存的文件而不是 MultipartFile
@PostMapping(value = "/users", consumes = {MediaType.MULTIPART_FORM_DATA_VALUE})
public ResponseEntity saveUsers(@RequestParam(value = "files") MultipartFile[] files) throws Exception {
var res = new ArrayList<CompletableFuture<List<User>>>();
List<Path> pathFiles = saveFile(files);
for(Path file: pathFiles) {
res.add(userService.saveUsers(file));
}
for(CompletableFuture<List<User>> item: res) {
System.out.println(item.get().size());
}
return ResponseEntity.status(HttpStatus.CREATED).build();
}
在 CompletableFuture 中使用任何 POJO 对象中的多部分字段时,会将文件存储在临时目录中,但访问该文件时,找不到临时目录,因此找不到异常文件。
解决方案:
在调用
HashMap<String,byte[]>
之前实现了
bytes[]
并存储了
CompletableFuture.supplyAsync(() - { //Business Logic Implementation })
希望这有帮助。
我对这个问题的 2 美分。 @Majid 和 @Giovani 的答案都有问题。
问题:在处理完所有文件后才会给出回复。
我认为这违背了异步的目的。
[我仍然不确定为什么不能从重用的线程中读取 MultipartFile。 (如果你发现了请告诉)]
我解决了上述问题:
同样可以通过将MultiPart文件保存在缓存或数据库中来实现
日志:(注意“[INFO] - 完成处理 - 发送回响应”被调用,之后线程继续异步工作)
[INFO] - storing files
[INFO] - -- saving file MOCK_DATA.csv
[INFO] - -- saving file MOCK_DATA (1).csv
[INFO] - -- saving file MOCK_DATA (2).csv
[INFO] - userThread-1 > processing file MOCK_DATA.csv
[INFO] - > parsing csv file MOCK_DATA.csv of size 62582
[INFO] - Done processing - sending response back
[INFO] - userThread-2 > processing file MOCK_DATA (1).csv
[INFO] - > parsing csv file MOCK_DATA (1).csv of size 62414
[INFO] - userThread-1 > saved all 1000 users
[INFO] - userThread-2 > saved all 1000 users
[INFO] - userThread-1 > total users : 2001
[INFO] - userThread-1 > deleting file : MOCK_DATA.csv
[INFO] - >Thread is free userThread-1
[INFO] - userThread-1 > processing file MOCK_DATA (2).csv
[INFO] - > parsing csv file MOCK_DATA (2).csv of size 62541
[INFO] - userThread-2 > total users : 2001
[INFO] - userThread-2 > deleting file : MOCK_DATA (1).csv
[INFO] - >Thread is free userThread-2
[INFO] - userThread-1 > saved all 1000 users
[INFO] - userThread-1 > total users : 3001
[INFO] - userThread-1 > deleting file : MOCK_DATA (2).csv
[INFO] - >Thread is free userThread-1
@PostMapping("/")
public ResponseEntity<String> populateTable(@RequestParam MultipartFile files[]) throws Exception {
log.info("storing files");
for (MultipartFile f : files) {
log.info(" -- saving file {}", f.getOriginalFilename());
File tempFile = new File(f.getOriginalFilename());
f.transferTo(tempFile.toPath());
service.saveFile(tempFile);
}
log.info("Done processing - sending response back");
return ResponseEntity.status(HttpStatus.CREATED).body("Populated files");
}
@Async
public CompletableFuture<Integer> saveFile(final File file) throws Exception {
String threadName = Thread.currentThread().getName();
log.info(threadName + " > processing file {}", file.getName());
List<User> users = parseCsvFile(file);
userRepo.saveAll(users);
log.info(threadName + " > saved all {} users", users.size());
log.info(threadName + " > total users : {}", getUserCount());
log.info(threadName + " > deleting file : {}", file.getName());
log.info(">Thread is free: {}", threadName);
file.delete();
return CompletableFuture.completedFuture(users.size());
}
private List<User> parseCsvFile(final File file) throws Exception {
final List<User> users = new ArrayList<>();
log.info(" > parsing csv file {} of size {} ", file.getName(), file.length());
try {
try(final BufferedReader br = new BufferedReader(new FileReader(file))){
String line;
br.readLine();//skipping first line
while ((line = br.readLine()) != null) {
final String data[] = line.split(",");
final User u = new User();
u.setFirstName(data[1]);
u.setLastName(data[2]);
u.setEmail(data[3]);
u.setGender(data[4]);
u.setIp(data[5]);
users.add(u);
}
}
} catch (IOException e) {
log.error(" >< something happened");
e.printStackTrace();
} catch (Exception e) {
log.error(" >< something happened with user size {}",
CollectionUtils.isEmpty(users) ? 0 : users.get(users.size() - 1));
e.printStackTrace();
}
return users;
}