Java:使用 executorService 运行异步任务

问题描述 投票:0回答:1

我有两个端点/issue和/callback,并且/callback将在/issue被调用后立即被调用。在问题中,我向执行器服务提交一个任务,并将未来存储到一个映射中,其中键作为订单 id,值作为执行器服务返回的未来,当调用回调时,它将使用 orderid 从地图中获取未来并等待要完成的问题任务(使用map.get(orderid).get())然后继续。现在假设我调用 /issue 并提交任务,甚至在 future 存储在 hashmap 之前调用 /callback,并且 Map.get(orderid).get() 抛出错误?我想确保在调用 /callback 之前 future 进入地图。这怎么办?

public class CallbackServiceImpl implements CallbackService {

    private final OrderService orderService;
    private final OrderRepository orderRepository;
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private final Map<String, Future<?>> issueCertificateFutureMap = new ConcurrentHashMap<>();
    private final Map<String, Future<?>> renewCertificateFutureMap = new ConcurrentHashMap<>();

    @Override
    public IssueCertificateAsyncResponseDto issueCertificateExecutorService(ConfirmSSLOrderRequestDto confirmSSLOrderRequestDto){
        String orderId = confirmSSLOrderRequestDto.getOrderid();
        Runnable issueCertificateRunnable = () -> {
          try {
              orderService.issueCertificate(confirmSSLOrderRequestDto);
              log.info("ISSUE ORDER COMPLETED for order {}", confirmSSLOrderRequestDto.getOrderid());
          }
          catch (Exception e){
              log.error(e.getMessage());
              throw new AppException(e.getMessage());
          }
        };
        Future<?> issueCertificateFuture = executorService.submit(issueCertificateRunnable);
        issueCertificateFutureMap.put(orderId, issueCertificateFuture); //I have to make sure this goes into the map
        return IssueCertificateAsyncResponseDto.builder()
                .caOrderId(confirmSSLOrderRequestDto.getOrderid())
                .build();
    }
    private void handleIssueOrderCallbackRequest(SSLIssueOrderCallBackDto sslIssueOrderCallBackDto){
        String orderId = sslIssueOrderCallBackDto.getCaOrderId();
        Runnable issueCertificateCallBackRunnable = () -> {
            try{
                issueCertificateFutureMap.get(orderId).get(); // before this gets executed
                //do callback task
            }
            catch (Exception e) {
                Thread.currentThread().interrupt();
                throw new AppException(e.getMessage());
            }
            finally {
                issueCertificateFutureMap.remove(orderId);
            }

        };
        executorService.submit(issueCertificateCallBackRunnable);
    }
}
java multithreading threadpool executorservice
1个回答
0
投票

我认为你正在寻找的是两个线程之间的同步,而你有第一个线程作为生产者,第二个线程作为消费者

根据您的代码,最简单的解决方案是将它们与 wait / notifyAll 同步

当你在一个线程上调用“wait”时,你让它停止执行并给其他线程运行的选项,它将停止,直到另一个线程在该对象上调用notify/notifyAll,在我的示例中使用“this”

示例解决方案如下所示:

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallbackServiceImpl implements CallbackService {

    private final OrderService orderService;
    private final OrderRepository orderRepository;
    private final ExecutorService executorService = Executors.newCachedThreadPool();
    private final Map<String, Future<?>> issueCertificateFutureMap = new ConcurrentHashMap<>();
    private final Map<String, Future<?>> renewCertificateFutureMap = new ConcurrentHashMap<>();

    @Override
    public IssueCertificateAsyncResponseDto issueCertificateExecutorService(ConfirmSSLOrderRequestDto confirmSSLOrderRequestDto){
        String orderId = confirmSSLOrderRequestDto.getOrderid();
        Runnable issueCertificateRunnable = () -> {
            try {
                orderService.issueCertificate(confirmSSLOrderRequestDto);
                log.info("ISSUE ORDER COMPLETED for order {}", confirmSSLOrderRequestDto.getOrderid());
            }
            catch (Exception e){
                log.error(e.getMessage());
                throw new AppException(e.getMessage());
            }
        };
        Future<?> issueCertificateFuture = executorService.submit(issueCertificateRunnable);
        issueCertificateFutureMap.put(orderId, issueCertificateFuture); //I have to make sure this goes into the map
        synchronized (this) {
            notifyAll();
        }
        return IssueCertificateAsyncResponseDto.builder()
                .caOrderId(confirmSSLOrderRequestDto.getOrderid())
                .build();
    }
    private void handleIssueOrderCallbackRequest(SSLIssueOrderCallBackDto sslIssueOrderCallBackDto){
        String orderId = sslIssueOrderCallBackDto.getCaOrderId();
        Runnable issueCertificateCallBackRunnable = () -> {
            try{
                synchronized (this) {
                    while (!issueCertificateFutureMap.containsKey(orderId)) {
                        // before this gets executed
                        wait(); // you could add some timeout, so 
                    }
                }
                issueCertificateFutureMap.get(orderId).get(); // before this gets executed
                //do callback task
            }
            catch (Exception e) {
                Thread.currentThread().interrupt();
                throw new AppException(e.getMessage());
            }
            finally {
                issueCertificateFutureMap.remove(orderId);
            }

        };
        executorService.submit(issueCertificateCallBackRunnable);
    }
}```

The reason I have while loop is because you might be notified for different orderIds, and you want to be waiting for the one that in the map
You may want to exit the loop after sometime, so maybe a timeout to "wait" is a better option, its just depends on your needs
© www.soinside.com 2019 - 2024. All rights reserved.