我正在尝试通过JSONStream.parse()来管道输入流(从巨大的GeoJSON文件创建)以将流分解为对象,然后通过event-stream.map()以允许我转换对象,然后通过JSONStream .stringify()从中创建一个字符串,最后创建一个可写的输出流。随着进程的运行,我可以看到节点的内存占用量继续增长,直到它最终耗尽堆。这是重新创建问题的最简单的脚本(test.js):
const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")
out = fs.createWriteStream("/dev/null")
process.stdin
.pipe(js.parse("features.*"))
.pipe(es.map( function(data, cb) {
cb(null, data);
return;
} ))
.pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
.pipe(out)
一个小的bash脚本(barf.sh)将无尽的JSON流注入节点的process.stdin将导致节点的堆逐渐增长:
#!/bin/bash
echo '{"type":"FeatureCollection","features":['
while :
do
echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done
通过运行它:
barf.sh | node test.js
有几种奇怪的方法来回避这个问题:
前两个操作中的任何一个都将允许脚本永远运行,节点的内存占用低且不变。我在八核机器上运行节点v6.3.1,事件流v3.3.4和JSONStream 1.1.4,其中8GB内存运行Ubuntu 16.04。
我希望有人可以帮我纠正我确信这是我的一个明显错误。
JSONStream不是streams2流,因此它不支持背压。 (有关streams2 here的简短摘要。)
这意味着数据将来自parse
事件中的data
流,并且流将继续将它们抽出来,无论消耗流是否为它们做好准备。如果管道中某处可以读取和写入的速度之间存在一些差异,那么就会有缓冲 - 这就是您所看到的。
你的barf.sh
线束看到通过stdin
泵入的功能。相反,如果您正在读取大量文件,则应该能够通过暂停文件的读取流来管理流程。因此,如果你要在你的pause/resume
回调中插入一些map
逻辑,你应该能够让它处理一个庞大的文件;它只需要一点时间。我会尝试这样的事情:
let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
.pipe(js.parse("features.*"))
.pipe(es.map(function(data, cb) {
// This is just an example; a 10-millisecond wait per feature would be very slow.
if (!in.isPaused()) {
in.pause();
global.setTimeout(function () { in.resume(); }, 10);
}
cb(null, data);
return;
}))
.pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
.pipe(out);
很明显,使用mapSync
在我的计算机上做的几乎没有差别(这是旧的和慢的)。但是,除非你在map
中执行一些异步操作,否则我会选择mapSync
。