我们使用以下代码将查询结果流式传输回客户端:
app.get('/events', (req, res) => {
try {
const stream = db('events')
.select('*')
.where({ id_user: 'foo' })
.stream()
stream.pipe(JSONStream.stringify()).pipe(res)
} catch (err) {
next(err)
}
})
虽然代码似乎具有出色的内存使用情况(稳定/低内存使用),但它会创建随机数据库连接获取超时:
Knex:获取连接超时。泳池可能已经满了。是 您错过了 .transacting(trx) 调用吗?
这种情况在生产中以看似随机的间隔发生。知道为什么吗?
调试起来很痛苦:
好吧,发生这种情况是因为中止的请求(即客户端在请求中关闭浏览器)不会将连接释放回池。
首先,确保您处于最新的knex;或者至少是 v0.21.3+,它引入了对流/池处理的修复。
从一开始你就有几个选择:
stream.pipeline
而不是 stream.pipe
来正确处理中止的请求,如下所示:
const { pipeline } = require('stream')
app.get('/events', (req, res) => {
try {
const stream = db('events')
.select('*')
.where({ id_session: req.query.id_session })
.stream()
return pipeline(stream, JSONStream.stringify(), res, err => {
if (err) {
return console.log(`Pipeline failed with err:`, err)
}
console.log(`Pipeline ended succesfully`)
})
} catch (err) {
next(err)
}
})
或者监听
close
上的 [req
][close] 事件并自行销毁数据库流,如下所示:
app.get('/events', (req, res) => {
try {
const stream = db('events')
.select('*')
.where({ id_session: req.query.id_session })
.stream()
// Not listening to this event will crash the process if
// stream.destroy(err) is called.
stream.on('error', () => {
console.log('Stream was destroyed')
})
req.on('close', () => {
// stream.end() does not seem to work, only destroy()
stream.destroy('Aborted request')
})
stream.pipe(JSONStream.stringify()).pipe(res)
} catch (err) {
next(err)
}
})
stream.end
好像不起作用。