core.async重新连接网络套接字

问题描述 投票:0回答:1

core.async noob,但尝试通过构建自动重新连接的websocket学习。这个想法是将套接字抽象化,以便使用它的任何人都不必担心它是否已连接,而只需将消息放在通道上即可。如果已连接套接字,则会立即从通道读取消息并发送消息。如果套接字当前未连接,则一旦建立连接,它将反复尝试重新连接并等待发送所有未决消息。

这是我到目前为止所拥有的:

(ns frontend.socket
  (:require-macros [cljs.core.async.macros :as asyncm :refer [go]])
  (:require [cljs.core.async :as async :refer [<! >! chan timeout]]))

(def endpoint "ws://localhost:8080")
(def socket (atom nil))
(def open (chan))
(def in (chan))
(def out (chan))
(def error (chan))
(def close (chan))

(defn open? []
  (= (.-readyState @socket) (.-OPEN @socket)))

(defn event>chan
  "Given a core.async channel, returns a
  function which takes an event and feeds
  it into the formerly given channel."
  [channel]
  (fn [e]
    (go (>! channel e))))

(defn connect
  "Opens a websocket, stores it in the socket
  atom, and feeds the socket's events into
  the corresponding channels."
  []
  (let [s (js/WebSocket. endpoint)]
    (reset! socket s)
    (set! s.onopen    (event>chan open))
    (set! s.onmessage (event>chan in))
    (set! s.onerror   (event>chan error))
    (set! s.onclose   (event>chan close))))

(defn init
  "This is the entry point from outside this
  namespace. Eagerly connects and then, if
  ever disconnected, attempts to reconnect
  every second. Also takes messages from
  the out channel and sends them through
  the socket."
  []
  (go
    (while true
      (connect)
      (<! close)
      (<! (timeout 1000))))
  (go
    (while true
      (let [m (<! out)]
        (when (or (open?) (<! open))
          (.send @socket m))))))

重新连接逻辑似乎工作正常,但是在套接字关闭后尝试发送消息时遇到了问题。在最后几行中,我检查在发送消息之前,套接字是打开的还是等待它打开。问题是,如果在先前打开套接字,然后关闭然后再发送消息的同时没有发送消息,那么在打开的通道上仍然有东西,因此无论如何都将发送该消息。因此,公开频道可以像以前一样“过时”。

我想过只要套接字关闭就从开放通道中取出,但这正好解决了这个问题:那么当我确实需要它们时,开放通道可能会丢失值,并且在重新连接时不会发送消息。

我在这里做错了什么?

clojurescript core.async
1个回答
0
投票

这是错误的方法,因为通道将等待被消耗。我需要改用pub / sub:https://github.com/clojure/core.async/wiki/Pub-Sub/549da1843c7bbe6009e9904eed49f91000d8ce2c

© www.soinside.com 2019 - 2024. All rights reserved.