我在Clojure和ClojureScript上试验core.async
,试图理解merge
是如何工作的。特别是,merge
是否可以将任何输入通道上的值立即用于合并通道。
我有以下代码:
(ns async-merge-example.core
(:require
#?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])
[async-merge-example.exec :as exec]))
(defn async-fn-timeout
[v]
(async/go
(async/<! (async/timeout (rand-int 5000)))
v))
(defn async-fn-exec
[v]
(exec/exec "sh" "-c" (str "sleep " (rand-int 5) "; echo " v ";")))
(defn merge-and-print-results
[seq async-fn]
(let [chans (async/merge (map async-fn seq))]
(async/go
(while (when-let [v (async/<! chans)]
(prn v)
v)))))
当我尝试使用大型async-fn-timeout
的seq
时:
(merge-and-print-results (range 20) async-fn-timeout)
对于Clojure和ClojureScript,我得到了我期望的结果,因为结果开始得到很快的打印,预期的延迟。
但是,当我尝试与async-fn-exec
相同的seq
时:
(merge-and-print-results (range 20) async-fn-exec)
对于ClojureScript,我得到了我期望的结果,因为结果开始几乎立即打印,具有预期的延迟。但是对于Clojure,即使sh
进程同时执行(受core.async
线程池的大小限制),结果似乎最初被延迟,然后大部分一次打印!我可以通过增加seq的大小来使这种差异更加明显,例如: (range 40)
由于async-fn-timeout
的结果与Clojure和ClojureScript的结果一致,因此指出了exec
的Clojure和ClojureScript实现之间的差异。
但我不知道为什么这种差异会导致这个问题呢?
笔记:
async-merge-example.exec
的源代码如下exec
中,由于Clojure / Java和ClojureScript / NodeJS之间的差异,Clojure和ClojureScript的实现不同。(ns async-merge-example.exec
(:require
#?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])))
; cljs implementation based on https://gist.github.com/frankhenderson/d60471e64faec9e2158c
; clj implementation based on https://stackoverflow.com/questions/45292625/how-to-perform-non-blocking-reading-stdout-from-a-subprocess-in-clojure
#?(:cljs (def spawn (.-spawn (js/require "child_process"))))
#?(:cljs
(defn exec-chan
"spawns a child process for cmd with args. routes stdout, stderr, and
the exit code to a channel. returns the channel immediately."
[cmd args]
(let [c (async/chan), p (spawn cmd (if args (clj->js args) (clj->js [])))]
(.on (.-stdout p) "data" #(async/put! c [:out (str %)]))
(.on (.-stderr p) "data" #(async/put! c [:err (str %)]))
(.on p "close" #(async/put! c [:exit (str %)]))
c)))
#?(:clj
(defn exec-chan
"spawns a child process for cmd with args. routes stdout, stderr, and
the exit code to a channel. returns the channel immediately."
[cmd args]
(let [c (async/chan)]
(async/go
(let [builder (ProcessBuilder. (into-array String (cons cmd (map str args))))
process (.start builder)]
(with-open [reader (clojure.java.io/reader (.getInputStream process))
err-reader (clojure.java.io/reader (.getErrorStream process))]
(loop []
(let [line (.readLine ^java.io.BufferedReader reader)
err (.readLine ^java.io.BufferedReader err-reader)]
(if (or line err)
(do (when line (async/>! c [:out line]))
(when err (async/>! c [:err err]))
(recur))
(do
(.waitFor process)
(async/>! c [:exit (.exitValue process)]))))))))
c)))
(defn exec
"executes cmd with args. returns a channel immediately which
will eventually receive a result map of
{:out [stdout-lines] :err [stderr-lines] :exit [exit-code]}"
[cmd & args]
(let [c (exec-chan cmd args)]
(async/go (loop [output (async/<! c) result {}]
(if (= :exit (first output))
(assoc result :exit (second output))
(recur (async/<! c) (update result (first output) #(conj (or % []) (second output)))))))))
您的Clojure实现在单个线程中使用阻塞IO。你是第一次从stdout读取,然后在循环中读取stderr。两者都阻止readLine
,所以他们只有在实际读完一行后才会返回。因此,除非您的进程向stdout和stderr创建相同数量的输出,否则一个流最终会阻塞另一个流。
一旦完成该过程,一旦缓冲区为空,readLine
将不再阻塞并返回nil
。因此循环只是完成读取缓冲输出,然后最终完成解释“一次性”消息。
您可能希望启动第二个处理stderr读取的线程。
node
没有阻止IO,因此默认情况下一切都发生异步,一个流不会阻塞另一个。