我编写了一个Web服务,使用Clojure中的惰性序列和各种无限序列公式(Euler,Leibniz)来生成Pi的估计值。 Clojure服务通过服务器发送事件通道发送这些估计。当前,HTML / JS视图正在使用Vue.js消耗SSE事件并显示它们。
只要不关闭SSE通道的连接,它在具有单个节点的服务中就可以很好地工作。但是到目前为止,如果连接已关闭或服务中断,它就不会持续或备份减少状态(无限系列中的位置)以从故障中恢复。另外,由于状态包含在服务的本地内存中(在Clojure序列值中),因此没有水平可伸缩性,例如,如果长期内存状态驻留在Redis中,则会存在这种情况。在这种情况下,仅添加新节点将无法提供实际划分工作的方式-只会复制相同的系列。使用Redis卸载长期内存状态是我用于无状态Web服务的一种设置,用于简化水平扩展和容错策略。
在这种有状态的情况下,我对于如何使用可并行处理序列项的分布式多节点解决方案扩展Clojure服务感到困惑。也许可以有一个调度“主”服务,将序列范围委托给不同的节点,并同时(通过Redis pub / sub)从节点接收结果,对它们进行数学汇总并生成视图的最终SSE流?在那种情况下,主服务将使用间隔约一千的无数个数字来产生范围边界,并行节点可使用该范围来初始化非无限Clojure序列(可能仍然很懒)?当然,在这种情况下,我需要标记进入的序列范围是完整的,并在范围处理期间出现节点故障的情况下,采用重试策略。
我正在研究Kubernetes状态集,以熟悉状态服务的部署模式,尽管我还没有遇到适合此特定问题的模式或解决方案。如果这是一种无状态服务,那么Kubernetes解决方案将是显而易见的,但是有状态方法却使我在Kubernetes环境中处于空白状态。
有人能为我指出这里的建筑方向吗?假设我确实希望保持封装在Clojure惰性序列(即本地服务内存)中的系列术语的状态,那么我在划分工作的策略上是否走对了轨道?
这里是单节点Clojure服务的相关代码:
(ns server-sent-events.service (:require [io.pedestal.http :as http] [io.pedestal.http.sse :as sse] [io.pedestal.http.route :as route] [io.pedestal.http.route.definition :refer [defroutes]] [ring.util.response :as ring-resp] [clojure.core.async :as async] ) ) (defn seq-of-terms [func] (map func (iterate (partial + 1) 0)) ) (defn euler-term [n] (let [current (+ n 1)] (/ 6.0 (* current current))) ) ; The following returns a lazy list representing iterable sums that estimate pi ; according to the Euler series for increasing amounts of terms in the series. ; Sample usage: (take 100 euler-reductions) (def euler-reductions (map (fn [sum] (Math/sqrt sum)) (reductions + (seq-of-terms euler-term) )) ) (defn leibniz-term [n] ; starts at zero (let [ oddnum (+ (* 2.0 n) 1.0) signfactor (- 1 (* 2 (mod n 2))) ] (/ (* 4.0 signfactor) oddnum) ) ) ; The following returns a lazy list representing iterable sums that estimate pi ; according to the Leibniz series for increasing amounts of terms in the series. ; Sample usage: (take 100 leibniz-reductions) (def leibniz-reductions (reductions + (seq-of-terms leibniz-term))) (defn send-result [event-ch count-num rdcts] (doseq [item rdcts] (Thread/sleep 150) ; we must use a naive throttle here to prevent an overflow on the core.async CSP channel, event-ch (async/put! event-ch (str item)) ) ) (defn sse-euler-stream-ready "Start to send estimates to the client according to the Euler series" [event-ch ctx] ;; The context is passed into this function. (let [ {:keys [request response-channel]} ctx lazy-list euler-reductions ] (send-result event-ch 10 lazy-list) ) ) (defn sse-leibniz-stream-ready "Start to send estimates to the client according to the Leibniz series" [event-ch ctx] (let [ {:keys [request response-channel]} ctx lazy-list leibniz-reductions ] (send-result event-ch 10 lazy-list) ) ) ;; Wire root URL to sse event stream ;; with custom event-id setting (defroutes routes [[["/" {:get [::send-result-euler (sse/start-event-stream sse-euler-stream-ready)]} ["/euler" {:get [::send-result (sse/start-event-stream sse-euler-stream-ready)]}] ["/leibniz" {:get [::send-result-leibniz (sse/start-event-stream sse-leibniz-stream-ready)]}] ]]]) (def url-for (route/url-for-routes routes)) (def service {:env :prod ::http/routes routes ;; Root for resource interceptor that is available by default. ::http/resource-path "/public" ;; Either :jetty or :tomcat (see comments in project.clj ;; to enable Tomcat) ::http/type :jetty ::http/port 8080 ;;::http/allowed-origins ["http://127.0.0.1:8081"] } )
完整代码在https://github.com/wclark-aburra-code/pi-service。内联Vue.js代码,它占用SSE流。
我编写了一个Web服务,使用Clojure中的惰性序列和各种无限序列公式(Euler,Leibniz)来生成Pi的估计值。 Clojure服务通过服务器发送的这些估计值发送...
如果只是为了扩展,我认为您不需要坚持任何措施。您需要的只是一个调度“主”(可能是客户端本身),以从多个后端请求分块的序列,并重新组装它们以正确的顺序进行交付。