我正在攻击node.js程序,该程序捕获SMTP邮件并对邮件数据进行操作。 'smtp-protocol'节点库以流形式提供邮件数据,作为node.js新手,我不知道如何将该流写入字符串变量。我目前使用该行写入stdout:
stream.pipe(process.stdout, { end : false });
正如我所说,我需要将此流数据写入字符串变量,然后在流结束后我将使用它。
非常感谢!
关键是使用这两个Stream events:
对于stream.on('data', ...)
,您应该将数据数据收集到Buffer(如果是二进制)或字符串中。
对于on('end', ...)
,您应该使用已完成的缓冲区调用回调,或者如果您可以内联它并使用Promises库返回。
那么像流减速器呢?
下面是使用ES6类的示例如何使用一个。
var stream = require('stream')
class StreamReducer extends stream.Writable {
constructor(chunkReducer, initialvalue, cb) {
super();
this.reducer = chunkReducer;
this.accumulator = initialvalue;
this.cb = cb;
}
_write(chunk, enc, next) {
this.accumulator = this.reducer(this.accumulator, chunk);
next();
}
end() {
this.cb(null, this.accumulator)
}
}
// just a test stream
class EmitterStream extends stream.Readable {
constructor(chunks) {
super();
this.chunks = chunks;
}
_read() {
this.chunks.forEach(function (chunk) {
this.push(chunk);
}.bind(this));
this.push(null);
}
}
// just transform the strings into buffer as we would get from fs stream or http request stream
(new EmitterStream(
["hello ", "world !"]
.map(function(str) {
return Buffer.from(str, 'utf8');
})
)).pipe(new StreamReducer(
function (acc, v) {
acc.push(v);
return acc;
},
[],
function(err, chunks) {
console.log(Buffer.concat(chunks).toString('utf8'));
})
);
最干净的解决方案可能是使用“string-stream”包,它将流转换为带有promise的字符串。
const streamString = require('stream-string')
streamString(myStream).then(string_variable => {
// myStream was converted to a string, and that string is stored in string_variable
console.log(string_variable)
}).catch(err => {
// myStream emitted an error event (err), so the promise from stream-string was rejected
throw err
})
塞巴斯蒂安J上面做得好。
我有几行测试代码的“缓冲问题”,并添加了编码信息并解决了它,见下文。
软件
// process.stdin.setEncoding('utf8');
process.stdin.on('data', (data) => {
console.log(typeof(data), data);
});
输入
hello world
产量
object <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64 0d 0a>
软件
process.stdin.setEncoding('utf8'); // <- Activate!
process.stdin.on('data', (data) => {
console.log(typeof(data), data);
});
输入
hello world
产量
string hello world
这对我有用,基于Node v6.7.0 docs:
let output = '';
stream.on('readable', function() {
let read = stream.read();
if (read !== null) {
// New stream data is available
output += read.toString();
} else {
// Stream is now finished when read is null.
// You can callback here e.g.:
callback(null, output);
}
});
stream.on('error', function(err) {
callback(err, null);
})
使用您可能已经在项目依赖项中使用的quite popular stream-buffers
package,这非常简单:
// imports
const { WritableStreamBuffer } = require('stream-buffers');
const { promisify } = require('util');
const { createReadStream } = require('fs');
const pipeline = promisify(require('stream').pipeline);
// sample stream
let stream = createReadStream('/etc/hosts');
// pipeline the stream into a buffer, and print the contents when done
let buf = new WritableStreamBuffer();
pipeline(stream, buf).then(() => console.log(buf.getContents().toString()));
基于@Ivan回答,创建一个Promise:
var text = await new Promise(function(resolve, reject) {
try {
stream.setEncoding('utf-8').on('data', function(data) {
resolve(data)
})
} catch(e) {
reject(e)
}
});
console.log(text)
希望这比上面的答案(现在有一个断开的链接)更有用。
另请注意,字符串连接不是收集字符串部分的有效方法,但它用于简单(也许您的代码不关心效率)
var string = '';
stream.on('data',function(data){
string += data.toString();
console.log('stream data ' + part);
});
stream.on('end',function(){
console.log('final output ' + string);
});
以上都不适合我。我需要使用Buffer对象:
const chunks = [];
readStream.on("data", function (chunk) {
chunks.push(chunk);
});
// Send the buffer or you can put it into a var
readStream.on("end", function () {
res.send(Buffer.concat(chunks));
});
另一种方法是将流转换为promise(请参阅下面的示例)并使用then
(或await
)将已解析的值分配给变量。
function streamToString (stream) {
const chunks = []
return new Promise((resolve, reject) => {
stream.on('data', chunk => chunks.push(chunk))
stream.on('error', reject)
stream.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')))
})
}
const result = await streamToString(stream)
我通常使用这个简单的函数将流转换为字符串:
function streamToString(stream, cb) {
const chunks = [];
stream.on('data', (chunk) => {
chunks.push(chunk.toString());
});
stream.on('end', () => {
cb(chunks.join(''));
});
}
用法示例:
let stream = fs.createReadStream('./myFile.foo');
streamToString(stream, (data) => {
console.log(data); // data is now my string variable
});
从nodejs documentation你应该这样做 - 永远记住一个字符串,而不知道编码只是一堆字节:
var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
assert.equal(typeof chunk, 'string');
console.log('got %d characters of string data', chunk.length);
})
Streams没有简单的.toString()
函数(我理解),也没有像.toStringAsync(cb)
函数(我不明白)。
所以我创建了自己的辅助函数:
var streamToString = function(stream, callback) {
var str = '';
stream.on('data', function(chunk) {
str += chunk;
});
stream.on('end', function() {
callback(str);
});
}
// how to use:
streamToString(myStream, function(myStr) {
console.log(myStr);
});
流行(每周下载量超过500万)和轻量级get-stream库的简便方法:
https://www.npmjs.com/package/get-stream
const fs = require('fs');
const getStream = require('get-stream');
(async () => {
const stream = fs.createReadStream('unicorn.txt');
console.log(await getStream(stream)); //output is string
})();
我有这样的运气:
let string = '';
readstream
.on('data', (buf) => string += buf.toString())
.on('end', () => console.log(string));
我使用节点v9.11.1
和readstream
是来自http.get
回调的响应。