在Clojure和ClojureScript中了解core.async合并

问题描述 投票:1回答:1

我在Clojure和ClojureScript上试验core.async,试图理解merge是如何工作的。特别是,merge是否可以将任何输入通道上的值立即用于合并通道。

我有以下代码:

(ns async-merge-example.core
  (:require
   #?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])
   [async-merge-example.exec :as exec]))

(defn async-fn-timeout
  [v]
  (async/go
    (async/<! (async/timeout (rand-int 5000)))
    v))

(defn async-fn-exec
  [v]
  (exec/exec "sh" "-c" (str "sleep " (rand-int 5) "; echo " v ";")))

(defn merge-and-print-results
  [seq async-fn]
  (let [chans (async/merge (map async-fn seq))]
    (async/go
      (while (when-let [v (async/<! chans)]
               (prn v)
               v)))))

当我尝试使用大型async-fn-timeoutseq时:

(merge-and-print-results (range 20) async-fn-timeout)

对于Clojure和ClojureScript,我得到了我期望的结果,因为结果开始得到很快的打印,预期的延迟。

但是,当我尝试与async-fn-exec相同的seq时:

(merge-and-print-results (range 20) async-fn-exec)

对于ClojureScript,我得到了我期望的结果,因为结果开始几乎立即打印,具有预期的延迟。但是对于Clojure,即使sh进程同时执行(受core.async线程池的大小限制),结果似乎最初被延迟,然后大部分一次打印!我可以通过增加seq的大小来使这种差异更加明显,例如: (range 40)

由于async-fn-timeout的结果与Clojure和ClojureScript的结果一致,因此指出了exec的Clojure和ClojureScript实现之间的差异。

但我不知道为什么这种差异会导致这个问题呢?

笔记:

  • 这些观察结果是在Windows 10上的WSL中​​进行的
  • async-merge-example.exec的源代码如下
  • exec中,由于Clojure / Java和ClojureScript / NodeJS之间的差异,Clojure和ClojureScript的实现不同。
(ns async-merge-example.exec
  (:require
   #?(:clj [clojure.core.async :as async] :cljs [cljs.core.async :as async])))

; cljs implementation based on https://gist.github.com/frankhenderson/d60471e64faec9e2158c

; clj implementation based on https://stackoverflow.com/questions/45292625/how-to-perform-non-blocking-reading-stdout-from-a-subprocess-in-clojure

#?(:cljs (def spawn (.-spawn (js/require "child_process"))))

#?(:cljs
   (defn exec-chan
     "spawns a child process for cmd with args. routes stdout, stderr, and
      the exit code to a channel. returns the channel immediately."
     [cmd args]
     (let [c (async/chan), p (spawn cmd (if args (clj->js args) (clj->js [])))]
       (.on (.-stdout p) "data"  #(async/put! c [:out  (str %)]))
       (.on (.-stderr p) "data"  #(async/put! c [:err  (str %)]))
       (.on p            "close" #(async/put! c [:exit (str %)]))
       c)))

#?(:clj
   (defn exec-chan
     "spawns a child process for cmd with args. routes stdout, stderr, and
      the exit code to a channel. returns the channel immediately."
     [cmd args]
     (let [c (async/chan)]
       (async/go
         (let [builder (ProcessBuilder. (into-array String (cons cmd (map str args))))
               process (.start builder)]
           (with-open [reader (clojure.java.io/reader (.getInputStream process))
                       err-reader (clojure.java.io/reader (.getErrorStream process))]
             (loop []
               (let [line (.readLine ^java.io.BufferedReader reader)
                     err (.readLine ^java.io.BufferedReader err-reader)]
                 (if (or line err)
                   (do (when line (async/>! c [:out line]))
                       (when err (async/>! c [:err err]))
                       (recur))
                   (do
                     (.waitFor process)
                     (async/>! c [:exit (.exitValue process)]))))))))
       c)))

(defn exec
  "executes cmd with args. returns a channel immediately which
   will eventually receive a result map of 
   {:out [stdout-lines] :err [stderr-lines] :exit [exit-code]}"
  [cmd & args]
  (let [c (exec-chan cmd args)]
    (async/go (loop [output (async/<! c) result {}]
                (if (= :exit (first output))
                  (assoc result :exit (second output))
                  (recur (async/<! c) (update result (first output) #(conj (or % []) (second output)))))))))
clojure clojurescript
1个回答
2
投票

您的Clojure实现在单个线程中使用阻塞IO。你是第一次从stdout读取,然后在循环中读取stderr。两者都阻止readLine,所以他们只有在实际读完一行后才会返回。因此,除非您的进程向stdout和stderr创建相同数量的输出,否则一个流最终会阻塞另一个流。

一旦完成该过程,一旦缓冲区为空,readLine将不再阻塞并返回nil。因此循环只是完成读取缓冲输出,然后最终完成解释“一次性”消息。

您可能希望启动第二个处理stderr读取的线程。

node没有阻止IO,因此默认情况下一切都发生异步,一个流不会阻塞另一个。

© www.soinside.com 2019 - 2024. All rights reserved.