将JSONStream.parsed()数据通过es.map()和JSONStream.stringify()传递给文件流时,节点堆已耗尽

问题描述 投票:1回答:1

我正在尝试通过JSONStream.parse()来管道输入流(从巨大的GeoJSON文件创建)以将流分解为对象,然后通过event-stream.map()以允许我转换对象,然后通过JSONStream .stringify()从中创建一个字符串,最后创建一个可写的输出流。随着进程的运行,我可以看到节点的内存占用量继续增长,直到它最终耗尽堆。这是重新创建问题的最简单的脚本(test.js):

const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")

out = fs.createWriteStream("/dev/null")
process.stdin
    .pipe(js.parse("features.*"))
    .pipe(es.map( function(data, cb) { 
        cb(null, data);
        return;
    } ))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out)

一个小的bash脚本(barf.sh)将无尽的JSON流注入节点的process.stdin将导致节点的堆逐渐增长:

#!/bin/bash

echo '{"type":"FeatureCollection","features":['
while :
do
    echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done

通过运行它:

barf.sh | node test.js

有几种奇怪的方法来回避这个问题:

  • 删除fs.createWriteStream()并将最后一个管道阶段从“.pipe(out)”更改为“.pipe(process.stdout)”,然后将管道节点的stdout更改为/ dev / null
  • 将异步es.map()更改为同步es.mapSync()

前两个操作中的任何一个都将允许脚本永远运行,节点的内存占用低且不变。我在八核机器上运行节点v6.3.1,事件流v3.3.4和JSONStream 1.1.4,其中8GB内存运行Ubuntu 16.04。

我希望有人可以帮我纠正我确信这是我的一个明显错误。

node.js event-stream jsonstream
1个回答
3
投票

JSONStream不是streams2流,因此它不支持背压。 (有关streams2 here的简短摘要。)

这意味着数据将来自parse事件中的data流,并且流将继续将它们抽出来,无论消耗流是否为它们做好准备。如果管道中某处可以读取和写入的速度之间存在一些差异,那么就会有缓冲 - 这就是您所看到的。

你的barf.sh线束看到通过stdin泵入的功能。相反,如果您正在读取大量文件,则应该能够通过暂停文件的读取流来管理流程。因此,如果你要在你的pause/resume回调中插入一些map逻辑,你应该能够让它处理一个庞大的文件;它只需要一点时间。我会尝试这样的事情:

let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
    .pipe(js.parse("features.*"))
    .pipe(es.map(function(data, cb) {
        // This is just an example; a 10-millisecond wait per feature would be very slow.
        if (!in.isPaused()) {
            in.pause();
            global.setTimeout(function () { in.resume(); }, 10);
        }
        cb(null, data);
        return;
    }))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out);

很明显,使用mapSync在我的计算机上做的几乎没有差别(这是旧的和慢的)。但是,除非你在map中执行一些异步操作,否则我会选择mapSync

© www.soinside.com 2019 - 2024. All rights reserved.