使用express和axios进行传输编码分块http请求的错误处理

问题描述 投票:0回答:1

我尝试使用流式传输从 JS 中的数据库中获取大量数据,以防止一次将所有数据加载到内存中。我使用express作为我的服务器和使用Axios获取数据的nodeJS客户端。我已成功通过流式传输获取数据,但不知道如何处理流式传输时发生的错误。

特快服务器:

app.get('/stream', async (req, res) => {
    try {
      const cursor = //fetch data with a limit & skip (MongoDB)
      while(cursor.hasNext()) {
        const data = await cursor.next()

        const writeToStream = new Promise((resolve) => {
          res.write(data, 'utf8', () => {
            console.log("batch sent");
            resolve()
          })
  
        })
        await writeToStream
      }
      res.end()
    } catch(error) {
      console.log(`error: ${error.message}`)
      
      //How do i send the status & error message to the requestor? 
      //return res.status(400).end(error.message) // <-- wanted behavior
  })

客户:

    try {
      const res = await axios({
        url: 'http://localhost:3000/test',
        responseType: 'stream'
      })

      const { data } = res
      data.pipe(someStream)

      data.on('end', () => { 
        //stream finished
      })

      data.on('error', (error) => { // First Option
        //error without a status or message
        //res.status(error.status).send(error.message) // <-- wanted behavior  
      })
    } catch(error) {  // Second Option
      //error without a status or message
      //return res.status(error.status).send(error.message) // <-- wanted behavior
    }
      

客户端上的错误处理有效(代码运行),但我无法弄清楚如何从服务器向客户端发送状态和消息,指示错误并指定它。

版本: “axios”:“^1.5.1”,“express”:“^4.18.2”

希望得到一些帮助。 提前谢谢!

javascript express error-handling axios chunked-encoding
1个回答
0
投票

问题是,在将标头发送到客户端后,您无法设置标头。因此,当您使用

res.write
启动流时,标头已作为
200
发送。

你能做的就是使用一个技巧。您可以为

DATA
ERROR
设置固定前缀。通过这种方式,您可以区分什么是真实数据和什么是真实错误。我知道这不是最有效的方法,但是可行并且似乎合理,因为流堆栈本身根本不提供错误管理机制

服务器.ts

import express from 'express'

const app = express()
const port = 3000

const DATA_PREFIX = "DATA:"
const ERROR_PREFIX = "ERROR:"

app.get('/stream', async (req, res) => {
  try {
    // I have replaced the query to MongoDB with some sample data.
    const sample_datas = ["data1", "data2", "data3", "data4", "data5"]

    for (let dt of sample_datas) {
      const writeToStream = new Promise((resolve) => {

        // if dt == "data4", then simulate an error
        if (dt == "data4") {
          throw new Error("data4 has problems.")
        }

        res.write(DATA_PREFIX + dt, 'utf8', () => {
          console.log("batch sent");
          resolve(true)
        })
      })
      await writeToStream
    }

    res.end()
  } catch (error) {
    console.log(`error: ${error.message}`)

    //How do i send the status & error message to the requestor?
    res.write(ERROR_PREFIX + error.message)
    res.end()
  }
})

app.listen(port, () => {
  console.log(`Example app listening on port ${port}`)
})

客户端.ts

import axios from 'axios'
import fs from 'fs'
import stream from 'stream'

const DATA_PREFIX = "DATA:"
const ERROR_PREFIX = "ERROR:"

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

const main = async () => {

  await sleep(2000)

  try {
    const res = await axios({
      url: 'http://localhost:3000/stream',
      responseType: 'stream'
    })

    const { data } = res

    const writableStream = fs.createWriteStream('__output__.txt');

    const appendText = new stream.Transform({
      transform(chunk: Buffer, encoding, callback) {

        const chunk_str = chunk.toString('utf8')

        if (chunk_str.startsWith(DATA_PREFIX)) {
          this.push(Buffer.from(chunk_str.replace(DATA_PREFIX, ""), 'utf8'))
        }
        else if (chunk_str.startsWith(ERROR_PREFIX)) {
          const error = chunk_str.replace(ERROR_PREFIX, "")
          error_function(new Error(error))
        }

        callback();
      }
    });

    data.pipe(appendText).pipe(writableStream);

    data.on('end', () => {
      console.log("stream finished")
    })

    const error_function = (error: Error) => {
      console.log("data.on error:", error)
    }

    data.on('error', error_function)

  } catch (error) {
    console.log("catch error:", error)
  }
}

main()
© www.soinside.com 2019 - 2024. All rights reserved.