等到我在 Java 多线程中收到信号

问题描述 投票:0回答:2

我有一个接收异步消息的多线程应用程序。如何等到收到消息(或超时)?是否有更好的方法使用 wait() 和 notify() 来做到这一点?

我试着用带有原子引用的原始线程睡眠来做到这一点。

class SignalReceiver{
    String message;
    Boolean messageReceived; // used AtomicBoolean

    void receive(String message){
        this.message = message;
        messageReceived = true; // set message flag
    }

    void waitTillMessageReceived(long timeout){
       if(!messageReceived){ // message could be received before 
          while(!messageReceived){
              Thread.sleep(100);
              // wait only till timeout
          }
       }
       messageReceived = false; // reset message flag
    }
}
java multithreading java-threads thread-synchronization
2个回答
1
投票

这是一个带有等待/通知的例子

公共类 SignalReceiver{ 字符串消息;

public static void main(String[] args) {
    final SignalReceiver sr=new SignalReceiver();
    
    new Thread(() -> {
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
        }
        System.out.println("message sent");
        sr.receive("hello");
    }).start();;
    
    System.out.println("waiting...");
    String mess=sr.waitTillMessageReceived(5000L);
    
    System.out.println("message: "+mess);
}

public synchronized  void receive(String message){
    this.message = message;
    notify();
}

public synchronized String waitTillMessageReceived(long timeout){
   try {
       wait(timeout);
   }
   catch (InterruptedException e) {
      
   }
   return message;
}

}

这个例子提出了一些问题。例如,它不能正确处理许多同步消息的到达。 正如评论中所建议的,最好使用适当的同步类,如 java.util.concurrent.LinkedBlockingDeque。


0
投票

BlockingQueue

我遵循了Louis Wassermanmarkspace提供的线索,建议使用队列。

我使用了

BlockingQueue
,它 (a) 是线程安全的,并且 (b) 在添加和删除元素时可以阻塞。我从该接口的 Javadoc 上给出的示例代码开始。

对于

BlockingQueue
接口的实现,我选择了
ArrayBlockingQueue

我创建了一对生产者-消费者类。他们通过 UUID

 类生产和消费随机生成的 
UUID 值。

这里是生产者类。在无限循环中,

run
方法尝试将 UUID 对象添加到队列中。如果队列当前繁忙,则尝试添加块,并在队列可用时继续。如果被中断,例如被执行者服务关闭,这个
run
方法完成,从而结束提交的任务。

package work.basil.example.async;

import java.time.Duration;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadLocalRandom;

public class Producer implements Runnable
{
    // Member fields.
    private final BlockingQueue < UUID > queue;

    // Constructor.
    Producer ( BlockingQueue < UUID > q ) { queue = q; }

    public void run ( )
    {
        System.out.println( "`Producer` object is starting its `run` method. " + Instant.now() );
        try
        {
            while ( true )
            {
                // Sleep, to simulate some lengthy work.
                Duration timeToSleep = Duration.ofMillis( ThreadLocalRandom.current().nextInt( 1_000 , 3_000 ) );
                try { Thread.sleep( timeToSleep ); } catch ( InterruptedException e ) { break; }
                queue.put( produce() );  // Post a new value to our BlockingQueue.
            }
        }
        catch ( InterruptedException e )
        {
            // Could be interrupted by an executor service closing, or other reasons.
            // Simply let this `Runnable` object end its `run` method.
            // No code needed here.
        }
        System.out.println( "`Producer` object is ending its `run` method, after being interrupted. " + Instant.now() );
    }

    // Logic
    UUID produce ( )
    {
        UUID uuid = UUID.randomUUID();
        System.out.println( "Producing UUID = " + uuid + " at " + Instant.now() );
        return uuid;
    }
}

还有消费阶层。

run
方法请求队列中的下一个可用元素。调用会阻塞,直到元素可用为止。在等待期间,如果执行程序服务关闭等中断发生,
run
方法完成,从而结束提交的任务。

package work.basil.example.async;

import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable
{
    private final BlockingQueue < UUID > queue;

    Consumer ( BlockingQueue < UUID > q ) { queue = q; }

    public void run ( )
    {
        System.out.println( "`Consumer` object is starting its `run` method. " + Instant.now() );
        try
        {
            while ( true ) { consume( queue.take() ); }
        }
        catch ( InterruptedException e )
        {
            // Could be interrupted by an executor service closing, or other reasons.
            // Simply let this `Runnable` object end its `run` method.
            // No code needed here.
        }
        System.out.println( "`Consumer` object is ending its `run` method, after being interrupted. " + Instant.now() );
    }

    void consume ( UUID uuid )
    {
        System.out.println( "Consuming UUID: " + uuid + " at " + Instant.now() );
    }
}

还有一个应用程序类来演示队列的运行情况。

请注意,您有责任在某个时候优雅地关闭您的执行程序服务,至少在您的应用程序执行结束时。否则线程的后备池可能会无限期地运行,就像僵尸🧟u200d♂️。

package work.basil.example.async;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.*;

public class App
{
    public static void main ( String[] args )
    {
        System.out.println( "INFO - Demo start. " + Instant.now() );

        BlockingQueue q = new ArrayBlockingQueue( 10 , true );
        Consumer c = new Consumer( q );
        Producer p = new Producer( q );

        ExecutorService executorService = Executors.newFixedThreadPool( 2 );
        executorService.submit( c );
        executorService.submit( p );

        // Let the background threads do their work for a while.
        try { Thread.sleep( Duration.ofSeconds( 8 ) ); } catch ( InterruptedException e ) { throw new RuntimeException( e ); }
        System.out.println( "INFO - Main thread is interrupting tasks run by executor service." );
        executorService.shutdownNow();
        try
        {
            if ( ! executorService.awaitTermination( 30 , TimeUnit.SECONDS ) )
            {
                System.err.println( "Executor service failed to terminate. " + Instant.now() );
            }
        }
        catch ( InterruptedException e ) { throw new RuntimeException( e ); }

        System.out.println( "INFO - Demo end. " + Instant.now() );
    }
}

这似乎有效。

INFO - Demo start. 2023-02-28T07:24:03.603169Z
`Consumer` object is starting its `run` method. 2023-02-28T07:24:03.614092Z
`Producer` object is starting its `run` method. 2023-02-28T07:24:03.614092Z
Producing UUID = 5fe7dd8a-9cc1-47b8-93aa-87bc031c2534 at 2023-02-28T07:24:04.855110Z
Consuming UUID: 5fe7dd8a-9cc1-47b8-93aa-87bc031c2534 at 2023-02-28T07:24:04.863520Z
Producing UUID = 97a31391-8c5c-4430-9737-b967a8c63987 at 2023-02-28T07:24:06.678767Z
Consuming UUID: 97a31391-8c5c-4430-9737-b967a8c63987 at 2023-02-28T07:24:06.679680Z
Producing UUID = af485e45-5dd5-4b68-82c8-41e1443c4566 at 2023-02-28T07:24:08.337011Z
Consuming UUID: af485e45-5dd5-4b68-82c8-41e1443c4566 at 2023-02-28T07:24:08.337917Z
Producing UUID = 4853a43c-feb6-41ec-91b0-1520c9f67347 at 2023-02-28T07:24:10.927173Z
Consuming UUID: 4853a43c-feb6-41ec-91b0-1520c9f67347 at 2023-02-28T07:24:10.928794Z
INFO - Main thread is interrupting tasks run by executor service.
`Consumer` object is ending its `run` method, after being interrupted. 2023-02-28T07:24:11.619844Z
`Producer` object is ending its `run` method, after being interrupted. 2023-02-28T07:24:11.619725Z
INFO - Demo end. 2023-02-28T07:24:11.621988Z

警告: 我不是并发问题的专家。使用风险自负。

© www.soinside.com 2019 - 2024. All rights reserved.