Java ExecutorService:- 当事件发生时通知线程唤醒

问题描述 投票:0回答:2

我有一个 Manager 类,多个线程向其中注册自己(使用

UUID
为每个请求生成唯一标识符),提供要处理的有效负载并从管理器获取相应的响应。我正在使用
java.util.concurrent.ExecutorService
启动多个线程。这是一个用于测试我的管理器功能的实现 -

public class ManagerTest {
    public static void main(String[] args) {
        try {
            Manager myManager = new Manager();
            // Start listening to the messages from different threads
            myManager.consumeMessages();
            int num_threads = Integer.parseInt(args[0]);
            ExecutorService executor = Executors.newFixedThreadPool(num_threads);

            for (int i = 0; i < num_threads; i++) {
                // class implementation is given below
                Runnable worker = new MyRunnable(myManager);
                executor.execute(worker);
            }
            executor.shutdown();
            // Wait until all threads are finish
            while (!executor.isTerminated()) {

            }
            System.out.println("\nFinished all threads");

            myManager.closeConnection();

        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

这里是

MyRunnable
类的实现

class MyRunnable implements Runnable {
    private Manager managerObj;
    public MyRunnable(Manager managerObj) {
        this.managerObj = managerObj;
    }

    @Override
    public void run() {     
        try {
            Random rand = new Random();
            int  n = rand.nextInt(35);
            String requestId = UUID.randomUUID().toString();
            managerObj.registerRequest(requestId, n);
            managerObj.publishMessage(requestId);
            // Want to avoid this while loop
            while( ! managerObj.getRequestStatus(requestId)){

            }
            int response = managerObj.getRequestResponse(requestId);
            // do something else
            managerObj.unregisterRequest(requestId);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

管理器将处理请求,并且根据有效负载,请求的响应可能需要不同的时间。每当管理器收到响应时,它都会通过调用此函数将请求状态设置为 true

setRequestStatus(requestId)
。此后,线程将从
while loop
退出并继续执行。

代码如果工作正常,但线程做了太多工作,则需要不断循环 while 循环,直到满足条件。

他们是一种在向管理器发送请求后使线程休眠的方法,并且管理器在其响应准备好时通知该线程唤醒。

如果这对某人来说太简单了,请原谅我,我是 java 和 java 线程接口的新手。

java multithreading concurrency java.util.concurrent java-threads
2个回答
4
投票

你正在做的是一个繁忙的自旋循环,这是一件非常糟糕的事情,因为a)它消耗了每个线程的完整CPU核心,b)它实际上使CPU保持繁忙,这意味着它是从可能有有用工作要做的其他线程窃取处理时间。

有很多方法可以解决这个问题,我将从最坏到最好列出它们。

  • 改进代码的最简单方法是调用

    java.lang.Thread.sleep(long millis)
    方法,并将其作为参数传递
    0
    。这也称为“yield”操作,它本质上意味着“如果有任何其他线程有一些有用的工作要做,让它们运行,并在完成后返回给我”。这仅比 busy-spin-looping 好一点,因为它仍然会消耗 100% CPU。好处是它只会消耗CPU,而其他线程没有任何事情可做,所以至少不会减慢其他事情。

  • 改进代码的下一个最佳但仍然不是很聪明的方法是调用

    java.lang.Thread.sleep(long millis)
    方法并将其
    1
    作为参数传递。这被称为“传递”操作,它本质上意味着“将剩余的时间片释放给可能有一些有用工作要做的任何其他线程”。换句话说,即使整个系统不需要做任何有用的工作,剩余的时间片也会被放弃。这将使 CPU 消耗几乎降至零。缺点是 a) CPU 消耗实际上略高于零,b) 机器将无法进入某些低功耗睡眠模式,以及 c) 您的工作线程的响应速度会稍差:它将恢复正常仅在时间片边界上完成的工作。

  • 解决问题的最佳方法是使用java内置的同步机制,如本答案所述:https://stackoverflow.com/a/5999146/773113这不仅会消耗零CPU,但它甚至可以让机器在等待时进入低功耗模式。

  • 为了解决最常见情况的问题,您不想只是等到条件出现,而是实际上还传递有关已完成的工作或要完成的工作的信息,您可以使用

    BlockingQueue
    。 (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html)阻塞队列使用java内置的同步机制并且允许一个线程将信息传递给另一个人。


2
投票

@MikeNakis 已经给出了很好的答案,但我也倾向于提供另一种选择。

此选项涉及更改 Manager API 以返回 Future

我建议对经理的改变如下:

  • 删除
    getRequestStatus()
    方法
  • 使
    getRequestResponse()
    返回
    Future<Integer>

完成这些更改后,

MyRunnable
run()
方法可以更改为:

public void run() {     
    try {
        Random rand = new Random();
        int  n = rand.nextInt(35);
        String requestId = UUID.randomUUID().toString();
        managerObj.registerRequest(requestId, n);
        managerObj.publishMessage(requestId);

        // Future.get() blocks and waits for the result without consuming CPU
        int response = managerObj.getRequestResponse(requestId).get();
        // do something else
        managerObj.unregisterRequest(requestId);

    } catch (IOException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}

实现

Future<>
的最简单方法可能是使用 Java 的
java.util.concurrent.FutureTask

© www.soinside.com 2019 - 2024. All rights reserved.