我写了一些恰好起作用的代码,但我认为readable.destroy()
(例如,src.destroy()
,在我的例子中)应该发出close
和error
事件......
这里有一个很小的例子来说明我的困惑:
const fs = require('fs');
const src = fs.createReadStream('streaming-example.js');
const dst = fs.createWriteStream('streaming-example.txt');
src.pipe(dst);
src.on('readable', () => {
let chunk;
while (null !== (chunk = src.read())) {
/**
* This should cause 'error' and 'close' events to emit
* @see https://nodejs.org/dist/latest-v11.x/docs/api/stream.html#stream_readable_destroy_error
*/
src.destroy();
}
});
src.on('close', () => console.log(`'close' event emitted`));
src.on('end', () => console.log(`'end' event emitted`));
src.on('error', (err) => console.log(`'error' event emitted`));
这是该程序的示例运行:
$ node streaming-example.js
'close' event emitted
$
(并且,它恰好也写完了一个名为streaming-example.txt
的新文件)
如果不清楚,我预计会发出close
和error
事件,然后触发相应的回调。但是,似乎只发布了close
事件。
error
事件发生了怎么了?
事实证明,在node.js代码库中占据一席之地可以解释这种混乱。
看看node/lib/_stream_readable.js,我们看到破坏函数在node/lib/internal/streams/destroy.js中定义。在这里,我们直接告诉(通过简单的javascript)error
事件只在一个truthy值被传递到destroy函数时才被发出(在语义上,这将是在相同的客户端代码中生成的错误,destroy
被调用)。
例如,如果我们只是通过更改来修改上面的示例代码
readable.destroy();
成为
readable.destroy(true); // or, more semantically correct, some Error value
我们得到以下输出:
$ node streaming-example.js
'error' event emitted with err: true
$
但是,现在我们失去了close
事件。那么......刚刚发生了什么?
再看看node/lib/internal/streams/destroy.js,我们注意到以下特殊情况逻辑:
const readableDestroyed = this._readableState &&
this._readableState.destroyed;
const writableDestroyed = this._writableState &&
this._writableState.destroyed;
if (readableDestroyed || writableDestroyed) {
if (cb) {
cb(err);
} else if (err &&
(!this._writableState || !this._writableState.errorEmitted)) {
process.nextTick(emitErrorNT, this, err);
}
return this;
}
// We set destroyed to true before firing error callbacks in order
// to make it re-entrance safe in case destroy() is called within callbacks
if (this._readableState) {
this._readableState.destroyed = true;
}
// If this is a duplex stream mark the writable part as destroyed as well
if (this._writableState) {
this._writableState.destroyed = true;
}
error
正确发射的事实,但它似乎close
没有发出暗示我们正在处理双工流。我们可以查看它是什么,但让我们坚持使用计算机告诉我们的内容,以保持简单。用这个while循环替换原来的while循环
while (null !== (chunk = src.read())) {
console.log('before-destroy, src:', JSON.stringify(src));
src.destroy(true); // should cause 'close' and 'error' events to emit
console.log('after-destroy, src:', JSON.stringify(src));
}
我们得到以下输出:
$ node streaming-example.js
before-destroy, src: {"_readableState":{"objectMode":false,"highWaterMark":65536,"buffer":{"head":null,"tail":null,"length":0},"length":0,"pipes":{"_writableState":{"objectMode":false,"highWaterMark":16384,"finalCalled":false,"needDrain":false,"ending":false,"ended":false,"finished":false,"destroyed":false,"decodeStrings":true,"defaultEncoding":"utf8","length":633,"writing":true,"corked":0,"sync":false,"bufferProcessing":false,"writelen":633,"bufferedRequest":null,"lastBufferedRequest":null,"pendingcb":1,"prefinished":false,"errorEmitted":false,"emitClose":false,"autoDestroy":false,"bufferedRequestCount":0,"corkedRequestsFree":{"next":null,"entry":null}},"writable":true,"_events":{},"_eventsCount":5,"path":"streaming-example.txt","fd":24,"flags":"w","mode":438,"autoClose":true,"bytesWritten":0,"closed":false},"pipesCount":1,"flowing":false,"ended":false,"endEmitted":false,"reading":true,"sync":false,"needReadable":true,"emittedReadable":false,"readableListening":true,"resumeScheduled":false,"paused":false,"emitClose":false,"autoDestroy":false,"destroyed":false,"defaultEncoding":"utf8","awaitDrain":0,"readingMore":true,"decoder":null,"encoding":null},"readable":true,"_events":{"end":[null,null,null]},"_eventsCount":5,"path":"streaming-example.js","fd":23,"flags":"r","mode":438,"end":null,"autoClose":true,"bytesRead":633,"closed":false}
after-destroy, src: {"_readableState":{"objectMode":false,"highWaterMark":65536,"buffer":{"head":null,"tail":null,"length":0},"length":0,"pipes":{"_writableState":{"objectMode":false,"highWaterMark":16384,"finalCalled":false,"needDrain":false,"ending":false,"ended":false,"finished":false,"destroyed":false,"decodeStrings":true,"defaultEncoding":"utf8","length":633,"writing":true,"corked":0,"sync":false,"bufferProcessing":false,"writelen":633,"bufferedRequest":null,"lastBufferedRequest":null,"pendingcb":1,"prefinished":false,"errorEmitted":false,"emitClose":false,"autoDestroy":false,"bufferedRequestCount":0,"corkedRequestsFree":{"next":null,"entry":null}},"writable":true,"_events":{},"_eventsCount":5,"path":"streaming-example.txt","fd":24,"flags":"w","mode":438,"autoClose":true,"bytesWritten":0,"closed":false},"pipesCount":1,"flowing":false,"ended":false,"endEmitted":false,"reading":true,"sync":false,"needReadable":true,"emittedReadable":false,"readableListening":true,"resumeScheduled":false,"paused":false,"emitClose":false,"autoDestroy":false,"destroyed":true,"defaultEncoding":"utf8","awaitDrain":0,"readingMore":true,"decoder":null,"encoding":null},"readable":true,"_events":{"end":[null,null,null]},"_eventsCount":5,"path":"streaming-example.js","fd":null,"flags":"r","mode":438,"end":null,"autoClose":true,"bytesRead":633,"closed":false}
'error' event emitted
$
这告诉我们,我们可能正在处理双工流,或者至少这向我们解释了为什么只发出error
事件(因为,特别是只看after-destroy
输出,this._readableState
和this._writeableState
都是真的,因此destroy
函数将局部变量readableDestroyed
和writeableDestroyed
设置为true,我们从console.log
注意到this._writableState.errorEmitted
是false
,因此process.nextTick(emitErrorNT, this, err);
在从destroy
函数退出之前被执行)。
这个问题现在得到了充分的回答。
作为一个额外的,很高兴知道duplex
流与其他类型的流之间的区别是什么。为此,this portion of the node.js documentation的快速参考是一个开始。
那么,当发出close
和error
事件时(例如,当我们不处理duplex
流时)的示例怎么样?以下代码和执行执行此操作,如下所示:
const readable = process.stdin;
const writable = process.stdout;
readable.setEncoding('utf8');
readable.on('readable', () => {
let chunk;
while ((chunk = readable.read()) !== null) {
writable.write(`data: ${chunk}`);
}
readable.destroy(true);
});
readable.on('close', () => console.log(`'close' event emitted`));
readable.on('error', (err) => console.log(`'error' event emitted with err:`, err));
执行此脚本以及一些I / O(键入asdf
,然后按回车键/回车键),提供以下输出:
$ node streaming-example2.js
asdf
data: asdf
'error' event emitted with err: true
'close' event emitted
$