发送记录并等待其确认接收

问题描述 投票:10回答:2

我使用下面的类通过使用socket以同步方式或异步方式将数据发送到我们的消息队列,如下所示。

  • sendAsync - 它以异步方式发送数据,没有任何超时。发送(on LINE A)之后,它会添加到retryHolder存储桶,这样如果没有收到确认,那么它将再次从构造函数中启动的后台线程重试。
  • send - 它在内部调用sendAsync方法,然后在特定的超时时间内休眠,如果没有收到确认,那么它将从retryHolder存储桶中删除,以便我们不再重试。

所以上述两种方法之间的唯一区别是 - 对于异步,我需要不惜一切代价重试但是为了同步我不需要重试但看起来可能会重试,因为我们共享相同的重试桶缓存并重试线程运行每1秒钟。

ResponsePoller是一个类,它接收发送到我们的消息队列的数据的确认,然后调用下面的removeFromretryHolder方法删除地址,以便我们在收到确认后不重试。

public class SendToQueue {
  private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
  private final Cache<Long, byte[]> retryHolder =
      CacheBuilder
          .newBuilder()
          .maximumSize(1000000)
          .concurrencyLevel(100)
          .removalListener(
              RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();

  private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
  }

  public static SendToQueue getInstance() {
    return Holder.INSTANCE;
  }

  private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the `retryHolder` cache accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        // retry again
        for (Entry<Long, byte[]> entry : retryHolder.asMap().entrySet()) {
          sendAsync(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedRecords);
    // send data on a socket LINE A
    boolean sent = msg.send(socket);
    msg.destroy();
    retryHolder.put(address, encodedRecords);
    return sent;
  }

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    boolean sent = sendAsync(address, encodedRecords, socket);
    // if the record was sent successfully, then only sleep for timeout period
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // if key is not present, then acknowledgement was received successfully
    sent = !retryHolder.asMap().containsKey(address);
    // and key is still present in the cache, then it means acknowledgment was not received after
    // waiting for timeout period, so we will remove it from cache.
    if (!sent)
      removeFromretryHolder(address);
    return sent;
  }

  public void removeFromretryHolder(final long address) {
    retryHolder.invalidate(address);
  }
}

如果有人调用send方法我们不重试的最佳方式是什么,但我们仍然需要知道是否收到确认。唯一的问题是我根本不需要重试。

对于所有同步调用,我们是否需要单独的存储桶才能确认,我们不会从该存储桶重试?

java multithreading thread-safety guava race-condition
2个回答
2
投票

该代码存在许多潜在问题:

  • 在致电retryHolder#put之前可能会收到答复。
  • 当重试消息时,可能存在竞争条件。
  • 如果两个消息发送到同一个地址,第二个消息会覆盖第一个消息?
  • 发送总是浪费时间睡觉,使用wait + notify而不是。

我会存储一个更多状态的类。它可以包含重试处理程序可以检查的标志(retryIfNoAnswer是/否)。它可以使用waitForAnswer / markAnswerReceived提供wait / notify方法,以便send无需在固定时间内休眠。如果获得答案,waitForAnswer方法可以返回true,超时时返回false。在发送之前将对象放入重试处理程序并使用时间戳,以便仅重试超过特定年龄的消息。这解决了第一场比赛的情况。

编辑:下面更新的示例代码,编译您的代码,未经测试:

public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);

// Not sure why you are using a cache rather than a standard ConcurrentHashMap?
private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
    .concurrencyLevel(100)
    .removalListener(RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();

private static class PendingMessage {
    private final long _address;
    private final byte[] _encodedRecords;
    private final Socket _socket;
    private final boolean _retryEnabled;
    private final Object _monitor = new Object();
    private long _sendTimeMillis;
    private volatile boolean _acknowledged;

    public PendingMessage(long address, byte[] encodedRecords, Socket socket, boolean retryEnabled) {
        _address = address;
        _sendTimeMillis = System.currentTimeMillis();
        _encodedRecords = encodedRecords;
        _socket = socket;
        _retryEnabled = retryEnabled;
    }

    public synchronized boolean hasExpired() {
        return System.currentTimeMillis() - _sendTimeMillis > 500L;
    }

    public synchronized void markResent() {
        _sendTimeMillis = System.currentTimeMillis();
    }

    public boolean shouldRetry() {
        return _retryEnabled && !_acknowledged;
    }

    public boolean waitForAck() {
        try {
            synchronized(_monitor) {
                _monitor.wait(500L);
            }
            return _acknowledged;
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    public void ackReceived() {
        _acknowledged = true;
        synchronized(_monitor) {
            _monitor.notifyAll();
        }
    }

    public long getAddress() {
        return _address;
    }

    public byte[] getEncodedRecords() {
        return _encodedRecords;
    }

    public Socket getSocket() {
        return _socket;
    }
}

private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
}

public static SendToQueue getInstance() {
    return Holder.INSTANCE;
}

private void handleRetries() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage m : messages) {
        if (m.hasExpired()) {
            if (m.shouldRetry()) {
                m.markResent();
                doSendAsync(m, m.getSocket());
            }
            else {
                // Or leave the message and let send remove it
                cache.invalidate(m.getAddress());
            }
        }
    }
}

private SendToQueue() {
    executorService.submit(new ResponsePoller()); // another thread which receives acknowledgement and then delete entry from the cache accordingly.
    executorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            handleRetries();
        }
    }, 0, 1, TimeUnit.SECONDS);
}

public boolean sendAsync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, true);
    cache.put(address, m);
    return doSendAsync(m, socket);
}

private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
        // send data on a socket LINE A
        return msg.send(socket);
    }
    finally {
        msg.destroy();
    }
}

public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, socket, false);
    cache.put(address, m);
    try {
        if (doSendAsync(m, socket)) {
            return m.waitForAck();
        }
        return false;
    }
    finally {
        // Alternatively (checks that address points to m):
        // cache.asMap().remove(address, m);
        cache.invalidate(address);
    }
}

public void handleAckReceived(final long address) {
    PendingMessage m = cache.getIfPresent(address);
    if (m != null) {
        m.ackReceived();
        cache.invalidate(address);
    }
}
}

并且从ResponsePoller打来电话:

SendToQueue.getInstance().handleAckReceived(addressFrom);

1
投票

设计方面:我觉得你正在尝试编写一个线程安全且有效的NIO消息发送者/接收者但是(这两个)代码我在这里看到的并不好,并且不会没有重大改变。最好的办法是:

  • 充分利用0MQ框架。我在这里看到的事情和期望在ZMQjava.util.concurrent API中实际上是开箱即用的。
  • 或者看看Nettyhttps://netty.io/index.html),如果它适用于您的项目。 “Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。”如果您的项目变得复杂,这将节省您的时间,否则开始时可能会有点过分(但随后会出现问题......)。

但是,如果你认为你的代码或@ john的代码差不多,那么我只会给出建议:

  • 不要使用wait()notify()。不要sleep()
  • 使用单个线程为您的“流跟踪器”(即〜待处理的消息缓存)。

你实际上并不需要3个线程来处理待处理的消息,除非这个处理本身很慢(或者做很多事情),这不是这里的情况,因为你基本上做了异步调用(只要它真的是异步的..是它?)。

反向路径也是如此:只有在实际处理缓慢/阻塞或繁重时才使用执行程序服务(多个线程)来处理接收到的数据包。

我根本不是0MQ的专家,但就socket.send(...)而言,它是线程安全且无阻塞的(我个人不确定 - 告诉我)上述建议应该是正确的并且使事情更简单。

那就是说,要严格回答你的问题:

对于所有同步调用,我们是否需要单独的存储桶才能确认,我们不会从该存储桶重试?

我会说不,所以您如何看待以下内容?根据您的代码并独立于我自己的感受,这似乎是可以接受的:

public class SendToQueue {
    // ...
    private final Map<Long, Boolean> transactions = new ConcurrentHashMap<>();
    // ...

    private void startTransaction(long address) {
        this.transactions.put(address, Boolean.FALSE);
    }

    public void updateTransaction(long address) {
        Boolean state = this.transactions.get(address);
        if (state != null) {    
            this.transactions.put(address, Boolean.TRUE);
        }
    }

    private void clearTransaction(long address) {
        this.transactions.remove(address);
    }

    public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
        boolean success = false;

        // If address is enough randomized or atomically counted (then ok for parallel send())
        startTransaction(address);
        try {
            boolean sent = sendAsync(address, encodedRecords, socket);
            // if the record was sent successfully, then only sleep for timeout period
            if (sent) {
                // wait for acknowledgement
                success = waitDoneUntil(new DoneCondition() {
                    @Override
                    public boolean isDone() {
                        return SendToQueue.this.transactions.get(address); // no NPE
                    }
                }, 500, TimeUnit.MILLISECONDS);

                if (success) {
                    // Message acknowledged!
                }
            }
        } finally {
            clearTransaction(address);
        }
        return success;
    }

    public static interface DoneCondition {
        public boolean isDone();
    }

    /**
     * WaitDoneUntil(Future f, int duration, TimeUnit unit). Note: includes a
     * sleep(50).
     *
     * @param f Will block for this future done until maxWaitMillis
     * @param waitTime Duration expressed in (time) unit.
     * @param unit Time unit.
     * @return DoneCondition finally met or not
     */
    public static boolean waitDoneUntil(DoneCondition f, int waitTime, TimeUnit unit) {
        long curMillis = 0;
        long maxWaitMillis = unit.toMillis(waitTime);

        while (!f.isDone() && curMillis < maxWaitMillis) {
            try {
                Thread.sleep(50);   // define your step here accordingly or set as parameter
            } catch (InterruptedException ex1) {
                //logger.debug("waitDoneUntil() interrupted.");
                break;
            }
            curMillis += 50L;
        }

        return f.isDone();
    }
    //...
}

public class ResponsePoller {
    //...

    public void onReceive(long address) {   // sample prototype     
        // ...
        SendToQueue.getInstance().updateTransaction(address);
        // The interested sender will know that its transaction is complete.
        // While subsequent (late) calls will have no effect.
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.