带有aleph的简单TCP客户端

问题描述 投票:0回答:0

我正在尝试通过一个简单的异步客户端连接TCP服务器,其代码我无法控制(套接字需要保持打开状态)。

通信是通过字节数组进行的,我有一些自定义函数来创建和解析各种字节数组以发送到该服务器。所以我不需要通过光泽进行任何编码或解码。但我确实需要一个非阻塞但连接的请求响应机制。

Aleph 看起来很棒,但我一直在尝试为我的用例实现 TCP 客户端。到目前为止,我有以下内容:

    (:require [manifold.deferred :as d]
              [manifold.stream :as s]
              [aleph.tcp :as tcp])
             

    (def aleph-client (atom nil)) 

    ;store my initial connection into the atom
    (reset! aleph-client @(tcp/client {:host (:HOST CONFIG) :port (:PORT CONFIG)}))

    (defn msg! 
      "A very simple request response method"
      [ bytebuf ]
       (d/let-flow [status (s/put! @aleph-client (.array bytebuf))
                    reply  @(s/take! @aleph-client)]
                 reply))

在上面的代码中,我能够将正确且预期的字节数组发送到服务器,并且我也收到响应。但是我无法正确接收和格式化消息。我总是看到以下内容:

(msg! my-bytes)
=> #object["[B" 0x7ad08644 "[B@7ad08644"]

;I seem to be getting a byte-buffer somehow but the byte array length
;is simply wrong for longer messages.
(alength (byte-buffer (msg! my-bytes)))
=>39

我不知道下一步该做什么。特别是:

  • 如何接收bytebuffer响应?
  • 有没有一种优雅的方式来使用我自己的自定义字节缓冲区创建和解码的wrap-duplex-stream?
  • 如何正确连接 put 和 take ?
  • 当我分别执行 put 和 take 时,这个神秘的响应是什么?
  • 我最后收到的bytebuffer确实是我期望的响应。至少是其中的一部分。有些消息在某个地方丢失了,我可以在连续拍摄时看到它!称呼。这是为什么?
  • 如何使用缓冲区来放入和取出更大的消息?

感谢 Aleph 和 Manifold 专家的任何帮助。

asynchronous tcp clojure aleph
© www.soinside.com 2019 - 2024. All rights reserved.