我写了一个spring-boot应用程序,它从用户那里收到一个名为Calc的对象,它包含两个参数,并返回一个包含复杂计算的答案(计算本身与问题无关)。因为系统可能正忙,所以每个对象都进入队列,并且有一个调度程序按顺序在队列中传递,并执行计算。
我的问题是如何将项目计算的结果返回给正确的请求。
我已经包含了我写的代码:
控制器:
@RestController
public class CalcController {
@Autowired
private CalculateService calculateService;
@RequestMapping("/")
public int calculate(@RequestBody Calc calc) {
return calculateService.calculate(calc);
}
}
计算对象:
@Data
public class Calc {
private int paramA;
private int paramB;
}
CalculateService:
@Service
public class CalculateService {
private BlockingQueue<Calc> calcQueue;
@PostConstruct
private void init() {
calcQueue = new LinkedBlockingDeque<>();
}
public int calculate(Calc calc) {
calcQueue.add(calc);
// TODO: Return calculation result.
return 0;
}
@Scheduled(fixedRate = 2000)
public void calculateQueue() throws InterruptedException {
while (!calcQueue.isEmpty()) {
Calc calc = calcQueue.take();
int result = Calculator.calculate(calc);
// TODO: Return calculation result to the right request.
}
}
}
谢谢
您可以使用基本上维护内部队列的ExecutorService
将您的工作请求分派给多个线程。
class Service {
// use 4 threads; optimal is the amount of processor cores available
private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4);
public int calculate(int input) {
Future<Integer> future = EXECUTOR.submit(() -> Calculator.calculate(input));
return future.get(); // this actually thrown an Exception you have to catch
}
}
当执行者有一个可用线程时,submit
将调用Calculator.calculate()
,future.get()
将提取该调用的实际结果。
请注意,此代码会一直阻塞,直到计算结果为止,只有计算本身才是并行的。如果您确实希望立即返回并在以后提供结果,那么这是一个不同的故事,但它并不真正适合REST控制器概念。
您还可以使用CompletableFuture
使此代码更简单
class Service {
public int calculate(int input) {
return CompletableFuture.supplyAsync(() -> Calculator.calculate(input)).get();
}
}
你可以使用Spring的@Async能力
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @EnableAsync @Configuration public class ThreadConfig { @Bean(name = "threadPoolTaskExecutor") public Executor threadPoolTaskExecutor() { return new ThreadPoolTaskExecutor(); } }
import org.springframework.stereotype.Service; import java.util.concurrent.CompletableFuture; @Service public class CalculateService { @Async("threadPoolTaskExecutor") public CompletableFuture<Integer> calculate(Calc calc) { int result = Calculator.calculate(calc); return CompletableFuture.completedFuture(result); } }
import com.example.service.CalculateService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @RestController public class CalcController { @Autowired private CalculateService calculateService; @RequestMapping("/") public int calculate(@RequestBody Calc calc) throws ExecutionException, InterruptedException { CompletableFuture<Integer> resultFut = calculateService.calculate(calc); // some other calls/logic return resultFut.get(); } }
如果您想根据请求存储会话,请参阅this SO post