使用 Node.js 写入大文件

问题描述 投票:0回答:8

我正在使用node.js使用可写流编写一个大文件:

var fs     = require('fs');
var stream = fs.createWriteStream('someFile.txt', { flags : 'w' });

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write( lines[i] );
    }
}

我想知道如果不使用

drain
事件,这个方案是否安全?如果不是(我认为是这种情况),将任意大数据写入文件的模式是什么?

node.js large-files
8个回答
29
投票

我最终就是这么做的。背后的想法是创建实现 ReadStream 接口的可读流,然后使用

pipe()
方法将数据传输到可写流。

var fs = require('fs');
var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' });
var readStream = new MyReadStream();

readStream.pipe(writeStream);
writeStream.on('close', function () {
    console.log('All done!');
});

MyReadStream
类的示例可以取自 mongoose QueryStream


13
投票

排水背后的想法是你可以用它来测试:

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        stream.write(lines[i]); //<-- the place to test
    }
}

但你不是。所以你需要重新架构以使其“可重入”。

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines;
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
        }
    }
}

但是,这是否意味着您在等待时还需要继续缓冲 getLines?

var fs = require('fs');
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'});

var lines,
    buffer = {
     remainingLines = []
    };
while (lines = getLines()) {
    for (var i = 0; i < lines.length; i++) {
        var written = stream.write(lines[i]); //<-- the place to test
        if (!written){
           //do something here to wait till you can safely write again
           //this means prepare a buffer and wait till you can come back to finish
           //  lines[i] -> remainder
           buffer.remainingLines = lines.slice(i);
           break;
           //notice there's no way to re-run this once we leave here.
        }
    }
}

stream.on('drain',function(){
  if (buffer.remainingLines.length){
    for (var i = 0; i < buffer.remainingLines.length; i++) {
      var written = stream.write(buffer.remainingLines[i]); //<-- the place to test
      if (!written){
       //do something here to wait till you can safely write again
       //this means prepare a buffer and wait till you can come back to finish
       //  lines[i] -> remainder
       buffer.remainingLines = lines.slice(i);
      }
    }
  }
});

7
投票

处理此问题的最简洁方法是使行生成器成为可读流 - 我们称之为

lineReader
。然后,以下内容将自动为您处理缓冲区并排出:

lineReader.pipe(fs.createWriteStream('someFile.txt'));

如果您不想创建可读流,您可以监听

write
的输出以了解缓冲区满度并按如下方式响应:

var i = 0, n = lines.length;
function write () {
  if (i === n) return;  // A callback could go here to know when it's done.
  while (stream.write(lines[i++]) && i < n);
  stream.once('drain', write);
}
write();  // Initial call.

可以在此处找到这种情况的更长示例。


4
投票

我发现流处理大文件的性能很差 - 这是因为您无法设置足够的输入缓冲区大小(至少我不知道有什么好的方法可以做到这一点)。这就是我所做的:

var fs = require('fs');

var i = fs.openSync('input.txt', 'r');
var o = fs.openSync('output.txt', 'w');

var buf = new Buffer(1024 * 1024), len, prev = '';

while(len = fs.readSync(i, buf, 0, buf.length)) {

    var a = (prev + buf.toString('ascii', 0, len)).split('\n');
    prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : '';

    var out = '';
    a.forEach(function(line) {

        if(!line)
            return;

        // do something with your line here

        out += line + '\n';
    });

    var bout = new Buffer(out, 'ascii');
    fs.writeSync(o, bout, 0, bout.length);
}

fs.closeSync(o);
fs.closeSync(i);

3
投票

这个问题的几个建议答案完全忽略了关于流的要点。

这个模块可以帮助https://www.npmjs.org/package/JSONStream

但是,让我们假设所描述的情况并自己编写代码。您正在以流的形式从 MongoDB 中读取数据,默认情况下 ObjectMode = true。

如果您尝试直接流式传输到文件,这将导致问题 - 类似“无效的非字符串/缓冲区块”错误。

此类问题的解决方法非常简单。

只需在可读和可写之间放置另一个 Transform 即可将可读对象适当地调整为可写字符串。

示例代码解决方案:

var fs = require('fs'),
    writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }),
    stream = require('stream'),
    stringifier = new stream.Transform();
stringifier._writableState.objectMode = true;
stringifier._transform = function (data, encoding, done) {
    this.push(JSON.stringify(data));
    this.push('\n');
    done();
}
rowFeedDao.getRowFeedsStream(merchantId, jobId)
.pipe(stringifier)
.pipe(writeStream).on('error', function (err) {
   // handle error condition
}

2
投票

[编辑] 更新的 Node.js

writable.write(...)
API 文档 说:

[]返回值仅供参考。即使返回 false,您也可以继续写入。不过,写入操作会在内存中进行缓冲,因此最好不要过度执行此操作。相反,在写入更多数据之前等待耗尽事件。

[原始]来自

stream.write(...)
文档(强调我的):

如果字符串已刷新到内核缓冲区,则返回

true
。返回
false
,表示内核缓冲区已满,数据会在以后发送出去

我将此解释为,如果给定的字符串立即写入底层操作系统缓冲区,则“write”函数返回

true
;如果尚未写入,则返回
false
,但 将由 write 函数写入(例如可能是通过 WriteStream 为您缓冲的),这样您就不必再次调用“write”。


1
投票

如果您没有输入流,则无法轻松使用管道。 以上都不适合我,排水事件不会触发。解决如下(基于泰勒斯的回答):

var lines[]; // some very large array
var i = 0;

function write() {
    if (i < lines.length)  {
        wstream.write(lines[i]), function(err){
            if (err) {
                console.log(err);
            } else {
                i++;
                write();
            }
        });
    } else {
        wstream.end();
        console.log("done");
    }
};
write();

0
投票

这是现代 Node.js 中使用生成器函数

Readable.from
pipeline
的简单解决方案。无需直接与 Streams 交互。

import { createWriteStream } from "fs";
import { Readable } from 'stream';
import { pipeline } from 'stream/promises';

function* generate() {
  for (const line of lines) {
    yield line;
    yield '\n';
  }
}

await pipeline(
  Readable.from(generate()),
  createWriteStream("someFile.txt"),
);
© www.soinside.com 2019 - 2024. All rights reserved.