具有延迟序列和SSE的状态服务-如何以容错方式进行分配?

问题描述 投票:0回答:1

我编写了一个Web服务,使用Clojure中的惰性序列和各种无限序列公式(Euler,Leibniz)来生成Pi的估计值。 Clojure服务通过服务器发送事件通道发送这些估计。当前,HTML / JS视图正在使用Vue.js消耗SSE事件并显示它们。

只要不关闭SSE通道的连接,它在具有单个节点的服务中就可以很好地工作。但是到目前为止,如果连接已关闭或服务中断,它就不会持续或备份减少状态(无限系列中的位置)以从故障中恢复。另外,由于状态包含在服务的本地内存中(在Clojure序列值中),因此没有水平可伸缩性,例如,如果长期内存状态驻留在Redis中,则会存在这种情况。在这种情况下,仅添加新节点将无法提供实际划分工作的方式-只会复制相同的系列。使用Redis卸载长期内存状态是我用于无状态Web服务的一种设置,用于简化水平扩展和容错策略。

在这种有状态的情况下,我对于如何使用可并行处理序列项的分布式多节点解决方案扩展Clojure服务感到困惑。也许可以有一个调度“主”服务,将序列范围委托给不同的节点,并同时(通过Redis pub / sub)从节点接收结果,对它们进行数学汇总并生成视图的最终SSE流?在那种情况下,主服务将使用间隔约一千的无数个数字来产生范围边界,并行节点可使用该范围来初始化非无限Clojure序列(可能仍然很懒)?当然,在这种情况下,我需要标记进入的序列范围是完整的,并在范围处理期间出现节点故障的情况下,采用重试策略。

我正在研究Kubernetes状态集,以熟悉状态服务的部署模式,尽管我还没有遇到适合此特定问题的模式或解决方案。如果这是一种无状态服务,那么Kubernetes解决方案将是显而易见的,但是有状态方法却使我在Kubernetes环境中处于空白状态。

有人能为我指出这里的建筑方向吗?假设我确实希望保持封装在Clojure惰性序列(即本地服务内存)中的系列术语的状态,那么我在划分工作的策略上是否走对了轨道?

这里是单节点Clojure服务的相关代码:

(ns server-sent-events.service
  (:require [io.pedestal.http :as http]
            [io.pedestal.http.sse :as sse]
            [io.pedestal.http.route :as route]
            [io.pedestal.http.route.definition :refer [defroutes]]
            [ring.util.response :as ring-resp]
            [clojure.core.async :as async]
  )
)

(defn seq-of-terms
   [func]
   (map func (iterate (partial + 1) 0))
)

(defn euler-term [n]
  (let [current (+ n 1)] (/ 6.0 (* current current)))
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Euler series for increasing amounts of terms in the series.
; Sample usage: (take 100 euler-reductions)
(def euler-reductions
  (map (fn [sum] (Math/sqrt sum))  (reductions + (seq-of-terms euler-term) ))
)

(defn leibniz-term [n] ; starts at zero
   (let [
          oddnum (+ (* 2.0 n) 1.0)
          signfactor (- 1 (* 2 (mod n 2)))
        ]
        (/ (* 4.0 signfactor) oddnum)
  )
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Leibniz series for increasing amounts of terms in the series.
; Sample usage: (take 100 leibniz-reductions)
(def leibniz-reductions (reductions + (seq-of-terms leibniz-term)))

(defn send-result
  [event-ch count-num rdcts]
  (doseq [item rdcts]
    (Thread/sleep 150) ; we must use a naive throttle here to prevent an overflow on the core.async CSP channel, event-ch
    (async/put! event-ch (str item))
  )
)

(defn sse-euler-stream-ready
  "Start to send estimates to the client according to the Euler series"
  [event-ch ctx]
  ;; The context is passed into this function.
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list euler-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)

(defn sse-leibniz-stream-ready
  "Start to send estimates to the client according to the Leibniz series"
  [event-ch ctx]
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list leibniz-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)


;; Wire root URL to sse event stream
;; with custom event-id setting
(defroutes routes
  [[["/" {:get [::send-result-euler (sse/start-event-stream sse-euler-stream-ready)]}
    ["/euler" {:get [::send-result
                    (sse/start-event-stream sse-euler-stream-ready)]}]
    ["/leibniz" {:get [::send-result-leibniz
                      (sse/start-event-stream sse-leibniz-stream-ready)]}]
    ]]])

(def url-for (route/url-for-routes routes))

(def service {:env :prod
              ::http/routes routes
              ;; Root for resource interceptor that is available by default.
              ::http/resource-path "/public"
              ;; Either :jetty or :tomcat (see comments in project.clj
              ;; to enable Tomcat)
              ::http/type :jetty
              ::http/port 8080
              ;;::http/allowed-origins ["http://127.0.0.1:8081"]
              }
)

完整代码在https://github.com/wclark-aburra-code/pi-service。内联Vue.js代码,它占用SSE流。

我编写了一个Web服务,使用Clojure中的惰性序列和各种无限序列公式(Euler,Leibniz)来生成Pi的估计值。 Clojure服务通过服务器发送的这些估计值发送...

kubernetes clojure lazy-evaluation server-sent-events horizontal-scaling
1个回答
0
投票

如果只是为了扩展,我认为您不需要坚持任何措施。您需要的只是一个调度“主”(可能是客户端本身),以从多个后端请求分块的序列,并重新组装它们以正确的顺序进行交付。

© www.soinside.com 2019 - 2024. All rights reserved.