spring boot - 为多个异步调用映射对请求的正确响应

问题描述 投票:3回答:2

我写了一个spring-boot应用程序,它从用户那里收到一个名为Calc的对象,它包含两个参数,并返回一个包含复杂计算的答案(计算本身与问题无关)。因为系统可能正忙,所以每个对象都进入队列,并且有一个调度程序按顺序在队列中传递,并执行计算。

我的问题是如何将项目计算的结果返回给正确的请求。

我已经包含了我写的代码:

控制器:

@RestController
public class CalcController {

    @Autowired
    private CalculateService calculateService;

    @RequestMapping("/")
    public int calculate(@RequestBody Calc calc) {
        return calculateService.calculate(calc);
    }
}

计算对象:

@Data
public class Calc {
    private int paramA;
    private int paramB;
}

CalculateService:

@Service
public class CalculateService {
    private BlockingQueue<Calc> calcQueue;

    @PostConstruct
    private void init() {
        calcQueue = new LinkedBlockingDeque<>();
    }

    public int calculate(Calc calc) {
        calcQueue.add(calc);

        // TODO: Return calculation result.
        return 0;
    }

    @Scheduled(fixedRate = 2000)
    public void calculateQueue() throws InterruptedException {
        while (!calcQueue.isEmpty()) {
            Calc calc = calcQueue.take();
            int result = Calculator.calculate(calc);
            // TODO: Return calculation result to the right request.
        }
    }
}

谢谢

java multithreading spring-boot
2个回答
0
投票

您可以使用基本上维护内部队列的ExecutorService将您的工作请求分派给多个线程。

class Service {
    // use 4 threads; optimal is the amount of processor cores available
    private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(4);

    public int calculate(int input) {
        Future<Integer> future = EXECUTOR.submit(() -> Calculator.calculate(input));
        return future.get(); // this actually thrown an Exception you have to catch
    }
}

当执行者有一个可用线程时,submit将调用Calculator.calculate()future.get()将提取该调用的实际结果。

请注意,此代码会一直阻塞,直到计算结果为止,只有计算本身才是并行的。如果您确实希望立即返回并在以后提供结果,那么这是一个不同的故事,但它并不真正适合REST控制器概念。

您还可以使用CompletableFuture使此代码更简单

class Service {
    public int calculate(int input) {
        return CompletableFuture.supplyAsync(() -> Calculator.calculate(input)).get();
    }
}

0
投票

你可以使用Spring的@Async能力

  1. 创建一个线程池
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@EnableAsync
@Configuration
public class ThreadConfig {
    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        return new ThreadPoolTaskExecutor();
    }
}
  1. 更新calculateService,您不需要在Queue中存储对象,它将由Spring的异步实用程序处理
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;

@Service
public class CalculateService {
    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Integer> calculate(Calc calc) {
        int result = Calculator.calculate(calc);
        return CompletableFuture.completedFuture(result);
    }
}
  1. 更新控制器方法
import com.example.service.CalculateService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@RestController
public class CalcController {

    @Autowired
    private CalculateService calculateService;

    @RequestMapping("/")
    public int calculate(@RequestBody Calc calc) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> resultFut = calculateService.calculate(calc);
        // some other calls/logic
        return resultFut.get();
    }
}

如果您想根据请求存储会话,请参阅this SO post

© www.soinside.com 2019 - 2024. All rights reserved.