NodeJs流,管道和https帖子

问题描述 投票:0回答:1

我需要进行一些健全性检查。

我正在运行节点11.10.1

我有一个使用nodejs oracledb库从oracle数据库读取的进程。有一个流功能,我可以执行select *并以1万个对象为批处理来流处理结果。然后,我通过https将数据发布到索引器。对象流被注入到管道函数中。

我一直在使用他下面的代码。我正在尝试调试吞吐量。有时我可以看到每秒有2k个文档正在通过该管道处理。大多数时候,我看到<150。在我开始调试索引服务器之前。我想确保这些功能的编码正确。

  async function streamReindex(databaseStream) {
    let pipeline = util.promisify(stream.pipeline)
    await pipeline(
      contractLinesStream,//  "oracledb": "^4.0.0", stream function
      camelize.camelizeStream(), //"camelize2": "^1.0.0", library wrapped in ,"through2": "^3.0.1" library to make it an object stream
      JSONStream.stringify(), //"JSONStream": "^1.3.5"
      reindexClient.streamReindex(core)
    )
  }

// reindexClient code.

  function streamReindex(core) {
    const updateUrl = baseUrl + core + '/update'
    const options = {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json'
      },
      'auth': `${user.username}:${user.password}`,
    }
    let postStream = https.request(updateUrl, options, (res) => {
      let response = {
        status: {
          code: res.statusCode,
          message: res.statusMessage
        },
        headers: res.headers,
      }
      if (res.statusCode !== 200) {
        postStream.destroy(new Error(JSON.stringify(response)))
      }
    })
    postStream.on('error', (err)=>{
      throw new Error(err)
    })
    postStream.on('socket', (socket) => {
      socket.setKeepAlive(true, 240000)
    })
    return postStream
  }

如果我从管道中删除reindexClient.streamReindex(core)函数。我看到每秒约有5k个对象的吞吐量。我一直在研究流的高水印功能,但似乎无法弄清楚如何在postStream上应用它。如果我console.log帖子流,它也不会说在对象模式下。这意味着它的高水印以字节为单位,我相信该阈值较低。

如果您需要更多信息,我会尽量提供。

node.js https stream node-streams
1个回答
0
投票

尽管您的问题似乎与oracledb无关,但我将其放在此处以便格式化代码。调整oracledb流可能会带来一些性能上的好处,例如:

   diff --git a/lib/queryStream.js b/lib/queryStream.js
   index 08ddc720..11953e4b 100644
   --- a/lib/queryStream.js
   +++ b/lib/queryStream.js
   @@ -24,7 +24,7 @@ const { Readable } = require('stream');
    class QueryStream extends Readable {

      constructor(rs) {
   -    super({ objectMode: true });
   +    super({ objectMode: true, highWaterMark: 64 });  // choose your own value
    this._fetching = false;
    this._numRows = 0;

欢迎将高水印设置为queryStream()选项的PR。

© www.soinside.com 2019 - 2024. All rights reserved.