我正在尝试逐行将实时websocket提要写入文件-我认为为此,我应该使用可写流。
我的问题是,接收到的数据在每秒10行的范围内,这会很快填满缓冲区。
我了解使用来自您控制的源的流时,通常会在此处添加某种背压逻辑,但是如果不控制源,该怎么办?我应该分批写入,一次写500行,而不是每行,还是应该使用其他方式保存此数据?
我想知道线条多大?每秒10行的声音听起来微不足道,以至于流到磁盘上,除非行数巨大或磁盘速度非常慢。最终,如果您无法应用反压逻辑,那么如果它们变快或存储变慢,源将使您不知所措,您必须决定可以合理缓冲的容量,如果得到的话最终只是丢弃一些数据后面。
但是,您应该能够写入大量数据。在我的普通硬盘上(使用下面的通用流代码,而没有其他缓冲),我可以以55 MBytes / sec的速度顺序写入100,000,000个字节:
因此,如果每秒有10行输入,只要每行低于10,000,000字节,我的硬盘驱动器就可以跟上。
这是我用来测试的代码:
const fs = require('fs');
const { Bench } = require('../../Github/measure');
const { addCommas } = require("../../Github/str-utils");
const lineData = Buffer.from("012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678\n", 'utf-8');
let stream = fs.createWriteStream("D:\\Temp\\temp.txt");
stream.on('open', function() {
let linesRemaining = 1_000_000;
let b = new Bench();
let bytes = 0;
function write() {
do {
linesRemaining--;
let readyMore;
bytes += lineData.length;
if (linesRemaining === 0) {
readyForMore = stream.write(lineData, done);
} else {
readyForMore = stream.write(lineData);
}
} while (linesRemaining > 0 && readyForMore);
if (linesRemaining > 0) {
stream.once('drain', write);
}
}
function done() {
b.markEnd();
console.log(`Time to write ${addCommas(bytes)} bytes: ${b.formatSec(3)}`);
console.log(`bytes/sec = ${addCommas((bytes/b.sec).toFixed(0))}`);
console.log(`MB/sec = ${addCommas(((bytes/(1024 * 1024))/b.sec).toFixed(1))}`);
stream.end();
}
b.markBegin();
write();
});
从理论上讲,磁盘进行较少的较大写操作要比大量的小型写操作效率更高。实际上,由于writeStream的工作方式,效率低下的写入速度变慢时,下一次写入将被缓冲,并且会进行自我更正。如果您确实想最大程度地减少磁盘上的负载,则可以缓冲写入操作,直到您至少要写入4k为止。问题在于,每次写入都可能会为文件分配一些字节(这涉及到写入磁盘上的表),然后查找应将字节写入磁盘的位置,然后再写入字节。更少且更大的写入(更大的限制取决于内部实现)将减少它必须执行文件分配开销的次数。
所以,我进行了测试。我修改了上面的代码(如下所示)以缓冲到4k块中,并以4k块的形式写出它们。写入速度从55 MB /秒增加到284.2 MB /秒。
因此,理论认为,如果缓冲成更大的块,编写速度会更快。
但是,即使是更简单的非缓冲版本也可能非常快。
这是缓冲版本的测试代码:
const fs = require('fs');
const { Bench } = require('../../Github/measure');
const { addCommas } = require("../../Github/str-utils");
const lineData = Buffer.from("012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678\n", 'utf-8');
let stream = fs.createWriteStream("D:\\Temp\\temp.txt");
stream.on('open', function() {
let linesRemaining = 1_000_000;
let b = new Bench();
let bytes = 0;
let cache = [];
let cacheTotal = 0;
const maxBuffered = 4 * 1024;
stream.myWrite = function(data, callback) {
if (callback) {
cache.push(data);
return stream.write(Buffer.concat(cache), callback);
} else {
cache.push(data);
cacheTotal += data.length;
if (cacheTotal >= maxBuffered) {
let ready = stream.write(Buffer.concat(cache));
cache.length = 0;
cacheTotal = 0;
return ready;
} else {
return true;
}
}
}
function write() {
do {
linesRemaining--;
let readyMore;
bytes += lineData.length;
if (linesRemaining === 0) {
readyForMore = stream.myWrite(lineData, done);
} else {
readyForMore = stream.myWrite(lineData);
}
} while (linesRemaining > 0 && readyForMore);
if (linesRemaining > 0) {
stream.once('drain', write);
}
}
function done() {
b.markEnd();
console.log(`Time to write ${addCommas(bytes)} bytes: ${b.formatSec(3)}`);
console.log(`bytes/sec = ${addCommas((bytes/b.sec).toFixed(0))}`);
console.log(`MB/sec = ${addCommas(((bytes/(1024 * 1024))/b.sec).toFixed(1))}`);
stream.end();
}
b.markBegin();
write();
});
此代码使用我的几个本地库来测量时间并格式化输出。如果您想自己运行它,则可以用自己的逻辑代替。