使用 fast-csv 处理 NestJS 中的大量数据文件

问题描述 投票:0回答:0

我正在使用 NestJS v9、fast-csv v4 和 BigQuery。

  1. 我的控制器(我上传了一个巨大的 CSV)

@Post('upload')
@ApiOperation({ description: 'Upload CSV File' })
@ApiConsumes('multipart/form-data')
@ApiBody({
  schema: {
    type: 'object',
    properties: {
      file: {
        type: 'string',
        format: 'binary',
      },
    },
  },
})
@ApiResponse({
  type: UploaderResponse,
  status: 200,
})
@UseInterceptors(FileInterceptor('file', { fileFilter: fileFilter }))
@Auth(UserRole.Admin, UserRole.SuperAdmin)
async uploadFile(
  @Body() uploadCsvDto: UploadCsvDto,
  @UploadedFile() file: Express.Multer.File,
): Promise<UploaderResponse> {
  if (!file) {
    throw new HttpException(FileErrors.CSVFormat, HttpStatus.BAD_REQUEST);
  }
  return await this.filesService.uploadCsv(file, uploadCsvDto);
}

  1. 我的服务

async uploadCsv(
  fileData: Express.Multer.File,
  uploadCsvDto: UploadCsvDto,
): Promise<UploaderResponse> {
  memoryUsage();
  await this.checkForDuplicates(uploadCsvDto);
  memoryUsage();

  // 2. Parse CSV
  await this.batchCsvResults(fileData.buffer, uploadCsvDto);
}

async batchCsvResults(
  buffer: Buffer,
  uploadCsvDto: UploadCsvDto,
): Promise<void> {
  const streamFromBuffer = Readable.from(buffer);
  const stream = parseStream(streamFromBuffer, {
    headers: (headers: string[]) => {
      return headers.map((h: string) => {
        if (this.checkCorrectColumn(h)) {
          return VALID_CSV_COLUMNS.find(
            (column: ValidColumn) => column.name === h.toLowerCase(),
          ).column;
        } else {
          console.log(`'${h}' no es una columna válida.`);
        }
      });
    },
    delimiter: ';',
    encoding: 'utf8',
    trim: true, // trim white spaces on columns only
  });

  let batch: DataRow[] = [];

  // Promise for waitting all the batches to end
  let resolvePromise: () => void;
  const processingPromise: Promise<void> = new Promise<void>((resolve) => {
    resolvePromise = resolve;
  });

  stream.on('data', async (row: DataRow) => {
    batch.push(row);

    if (batch.length === 200) {
      stream.pause();
      await this.processBatch(batch, uploadCsvDto)
        .then(() => {
          batch.length = 0;
          stream.resume();
        })
        .catch((err) => {
          console.error(err);
          stream.destroy();
        });
    }
  });

  stream.on('end', async () => {
    if (batch.length > 0) {
      await this.processBatch(batch, uploadCsvDto)
        .then(() => {
          batch = null;
          console.log('All batches have been processed');
        })
        .catch((err) => {
          console.error(err);
        });
    } else {
      console.log('All batches have been processed');
    }
    resolvePromise();
  });

  stream.on('error', (err) => {
    console.error(err);
  });

  await processingPromise;
}

async mapChildRows(
  data: DataRow[],
  uploadCsvDto: UploadCsvDto,
): Promise<void> {
  memoryUsage();
  let formattedRows = [];
  for (let i = 0; i < data.length; i++) {
    // Convert object values to their type
    const dataObject: any = data[i];
    Object.keys(dataObject).forEach((key) => {
      dataObject[key] = this.getChildColumnValue(key, dataObject[key]);
    });

    // Add dynamic fields
    dataObject[DYNAMIC_CSV_COLUMNS.idClient] = uploadCsvDto.idClient;
    dataObject[DYNAMIC_CSV_COLUMNS.subservice] = uploadCsvDto.subservice;
    dataObject[DYNAMIC_CSV_COLUMNS.subtask] = uploadCsvDto.subtask;

    // Add actual day to uploaded file. We have an extra space at the end of the array, because
    // they do not send us the upload_date, its a fixed column but a dynamic value added by us
    dataObject[DYNAMIC_CSV_COLUMNS.upload_date] = actualDate();
    formattedRows.push(dataObject);
  }

  memoryUsage();
  await this.insertIntoCustomAnalyticsBigQuery(formattedRows);
  data = null;
  formattedRows = null;
}

async insertIntoCustomAnalyticsBigQuery(rows: DataRow[]) {
  try {
    memoryUsage();
    await this.bigQueryClient.dataset(DB_NAME).table(DATA_TABLE).insert(rows);
    console.log(`Inserted ${rows.length} rows`);
  } catch (ex) {
    console.log(JSON.stringify(ex));
    throw new HttpException(
      'Error inserting into Data table from CSV.',
      HttpStatus.BAD_REQUEST,
    );
  }
}

  1. 服务说明
  • 首先,我检查 BigQuery 是否有以前更新的 CSV 的重复项(这并不重要)。
  • 然后我使用流(fast-csv)来防止将所有 csv 内容定位到内存中。
  • 我每 200 行转换一次数据,然后从这 200 行中,我将每行 50 保存到 bigquery 中。
  • 最后,我将它保存到 BigQuery 中。

问题: 这适用于 50.000 行的文件,但更高的文件会给我带来内存问题:

50.000 行 CSV:

250.000 行 CSV:

我不明白为什么,当我不需要时清空所有变量。

我的节点服务器还告诉我,我的最大内存大小是 512MB。

我的内存检查功能

export function memoryUsage(): void {
 return console.log(
  `APP is using ${
    Math.round((process.memoryUsage().rss / 1024 / 1024) * 100) / 100
  } MB of memory.`,
 );
}
javascript csv google-bigquery stream nestjs
© www.soinside.com 2019 - 2024. All rights reserved.