Communication Manager,控制异步网络消息传递设计的速率

问题描述 投票:0回答:1

我有一个我需要访问的系统,对该系统的访问速率是1次API调用。但是,我希望通过异步的Web界面提供对它的访问。我有一个设计,需要一个专用的通信管理器线程,它将消息收集到一个队列中,并一次传输一个,然后用消息的结果回拨给发送者。

这是一个好方法吗?您是否看到我的代码中存在任何明显的陷阱?

public class CommunicationManager implements Runnable
{
    private BlockingQueue<Message> message = new LinkedList<> ();

    private boolean shutdown = false;

    private Messenger messenger = new Messenger();

    public CommunicationManager() {}

    public void run()
    {
        long elapsed, start, diff;
        start = 0;

        while (!shutdown)
        {
            elapsed = System.currentTimeMillis();
            diff = elapsed - start;
            if (diff < 1000)
            {
                Thread.sleep(1000 - diff);
            }
            if (!message.isEmpty())
            {
                Message next = message.remove();
                next.getSender().recieve(messenger.send(next.getMessage()));
            }
            start = elapsed;
        }           
    }

    public synchronized void addMessage(Sender sender, String message)
    {
        this.message.add(new Message(sender, message));
    }

    public synchronized void shutdown()
    {
        this.shutdown = true;
    }
}

这个管理器的有希望的结果是每个循环,如果从上一个循环开始后没有经过第二个循环,它将在剩余时间内休眠。然后它将检查队列是否为空。如果队列不为空,它将检索队列中的下一条消息,发送消息并将结果返回给发送方的回调。然后循环结束并再次启动循环。

我使用了BlockingQueue来避免在我删除队列中的最后一条消息时有人向队列添加消息的问题。我不认为默认的Queue结构是线程安全的,所以我需要一些方法来防止这种情况发生。

java thread-safety synchronized
1个回答
0
投票

事实证明我在这里重新发明了一点轮子。使用Java ScheduledExecutorService可以更轻松地完成我真正想要做的事情。

class CommunicationControl
{
   private final ScheduledExecutorService scheduluer = Executors.newScheduledThreadPool(1);

   public void startManager()
   {
      final CommunicationManager manager = new CommunicationManager();
      scheduler.scheduleAtFixedRate(manager, 10, 1, SECONDS);
   }

   public void stopManager()
   {
      while (manager.getMessageCount() > 0)
      {
          try
          {
              Thread.sleep(manager.getMessageCount() * 1000);
          } catch (InterruptedException e)
          {
              e.printStackTrace();
          }
      }
      scheduler.shutdown();
   }
}

这里原始课程的改造:

class CommunicationManager implements Runnable
{
   private BlockingQueue<Message> message = new ArrayBlockingQueue<Message> (1000);
   private Messenger messenger = new Messenger();

   public CommunicationManager() {}

   public void run()
   {
      Message next = message.poll();
      if (next != null)
      {
         next.getSender().recieve(messenger.send(next.getMessage()));
      }
   }

   public void addMessage(Sender sender, String message)
   {
       try
       {
          while (!this.message.offer(new Message(sender, message), 1, TimeUnit.SECONDS)) {}
       } catch (InterruptedException e)
       {
          e.printStackTrace();
       }
   }
}

这将做的是首先创建一个执行器来处理我的Communication Manager运行的频率,在这种情况下,在10秒的延迟后每秒一次。然后,每当发生这种情况时,CommuncationManager的Run方法将执行恰好一个工作单元(如果可用)。

只要存在队列空间,其他方法可以自由地向CommunicationManager添加工作,否则它们将阻塞直到空间可用。

这也使我能够将其构建到Spring等框架中。

© www.soinside.com 2019 - 2024. All rights reserved.