我正在使用一些外部库,这些库提供了一些返回 EventEmitter 对象实例的函数。这些函数需要按特定顺序调用,并使用库示例中的嵌套回调实现。
我希望能够利用 Promises 稍微清理一下我的代码,我尝试了类似的方法:
const messageEmitters = await getMessages(FetchMessageEmitter)
const messagesBody = await getMessageBody(messagesEmitters[0])
我的问题是,当节点解释器到达消息主体时,传递的 eventEmitters 有一个参数,但还没有监听器,它们已定义,但活动监听器 (emitter._eventCount) 的数量在事件循环。
但在下一阶段,听众很活跃。我认为这可能来自我对 Promises 的使用。我正在考虑某种解决方案来监听所有在履行承诺之前定义的听众,但我不知道该怎么做。
/*
* This should resolve an array of EventEmitters (one per messages)
*/
const getMessages = async (emitter) => {
return new Promise((fulfill, reject) => {
let _messages = []
emitter.on('message', m => _messages.push(m))
emitter.once('end', () => fulfill(_messages))
emitter.once('error', error => reject(error))
})
}
/*
* This one should resolve some data after getting the body of the message
*/
const getMessageBody = async (emitter) => {
return new Promise((fulfill, reject) => {
let _body = null
emitter.on('body' => (stream) {
// do something with the stream
})
emitter.once('end', () => { /* fulfill the promise */ })
emitter.once('error', error => reject(error))
})
}
这是在履行接收发射器的承诺之前设置各个身体侦听器的方法:
// very much like getMessages(), except this resolves to an array of
// promises that themselves resolve to an array of message bodies
const getBodyPromises = async (emitter) => {
return new Promise((fulfill, reject) => {
let _bodyPromises = []
// call the OP's get message here, so the body promise is set up
// before control returns to the emitter-creating code
emitter.on('message', m => {
let bodyPromise = getMessageBody(m)
_bodyPromises.push(bodyPromise)
});
emitter.once('end', () => fulfill(_bodyPromises))
emitter.once('error', error => reject(error))
})
}
// use the OP's getMessageBody as-is
const getMessageBody = async (emitter) => { ... }
现在调用者期望
getBodyPromises()
返回一个解析为一系列 promises 的 promise。
const bodyPromises = await getBodyPromises(FetchMessageEmitter)
const messagesBodies = await Promise.all(bodyPromises);
如果需要,您仍然可以单独处理身体承诺。只需将它们循环并在每个上附加一个
.then()
。
使用这种方法,您可以确保在 getMessageBody 函数中访问它们之前设置所有事件监听器。
const getMessages = async (emitter) => {
return new Promise((fulfill, reject) => {
let _messages = []
// Use another Promise to wait for all the event listeners to be set up
const listenersSetPromise = new Promise((resolve) => {
emitter.on('newListener', (event, listener) => {
// Check if all the expected event listeners have been set up
if (emitter.listenerCount('message') === 1 &&
emitter.listenerCount('end') === 1 &&
emitter.listenerCount('error') === 1) {
resolve()
}
})
})
// Wait for the listeners to be set up before resolving the outer Promise
listenersSetPromise.then(() => {
emitter.on('message', m => _messages.push(m))
emitter.once('end', () => fulfill(_messages))
emitter.once('error', error => reject(error))
})
})
}
新的 promise
listenersSetPromise
期望所有预期的事件监听器都使用 newListener
event
一旦配置了所有监听器,
resolver
函数就会被调用到 fulfill
listenersSetPromise
.
最后,
listenersSetPromise
使用 then
方法来设置事件侦听器和 fulfill
带有 EventEmitter 数组的外部承诺。