NodeJS 叉平衡器

问题描述 投票:0回答:1

我很久以前就读过一篇关于在单独的子进程中解决高内存使用任务的文章。根据这篇文章,我已经实现了相同的并且延迟已经减少。但目前面临有线问题。

问题陈述: Fastify 服务器 - 浏览器将向 fastify 服务器发出请求,在 fastify 中我需要向下游进行 100 多个并行 axios api 调用(不要问为什么我 dng - 目前我们遇到了一些挑战)并且我将返回一旦所有 api 调用成功,就会响应浏览器。这些并行 API 调用会产生延迟问题,特别是当 Fastify 服务器的 TPS(每秒事务数)较高时。

解决方案:为了缓解延迟问题,我尝试使用 NodeJS 的 Fork 功能将并行调用的逻辑卸载到子进程。正如预期的那样,这减少了延迟。然而,我当前面临的问题是,当我的服务器有大量并行请求时,大约 60% 的请求收到了错误的响应。

实施:
ForkBalancer.js

import { ChildProcess, fork } from 'child_process';

const requestLimit = 0;

interface forkResponse {
  kill: boolean;
  string?: string;
}

class ForkBalancer {
path: string;
forks: number;
maxRAM?: number;
args?: Array<string>;

private activeFork: number;
private resolvers = new Map();
private renderers: Array<ChildProcess>;

constructor({ path = '', forks = 5, maxRAM = 250, args = [] }) {
    this.activeFork = 0;
    this.forks = forks;
    this.maxRAM = maxRAM;
    this.path = path;
    this.args = args;
    this.renderers = Array.from({ length: forks }, () => this.createFork());
}

public getFromRenderer(params: any): Promise<forkResponse> {
    const { resolvers, maxRAM, activeFork, restartFork, renderers } = this;
    const renderer = renderers[activeFork];

    return new Promise(function(resolve, reject) {
        try {
            renderer.once('message', (res: any) => {
                resolvers.delete(params.request.url);
                resolve(res);

                if (res.kill) restartFork();
            });

            if (!resolvers.has(params.request.url)) {
                renderer.setMaxListeners(requestLimit);
                resolvers.set(params.request.url, resolve);
                renderer.send({ ...params, maxRAM });
            }
        } catch (error) {
            resolvers.delete(params.request.url);
            reject(error);
        }
    });
}

private createFork = () => {
    const { path, args } = this;
    return fork(path, args);
};

private restartFork = () => {
    const { activeFork, renderers, next, createFork } = this;
    const renderer = renderers[activeFork];
    next();
    renderer.kill();
    this.renderers[activeFork] = createFork();
};

private next = () => {
    const { activeFork, forks } = this;
    if (activeFork === forks - 1) {
        this.activeFork = 0;
    } else {
        this.activeFork++;
    }
};
}
export default ForkBalancer; 

并行API.js

import axiosInstance, { AxiosRequestConfig } from 'axios';

const maxRAM = 128
process.on('message', async (params: any) => {
  const { totalPages, offset: offsetProps = 0, PAGE_SIZE_LIMIT, request, body, testId, url } = params;
  const requests = [];
 for (let offset = offsetProps; offset <= totalPages; offset++) {
   requests.push(
      axiosInstance.post(
    `API_URL/search/v2?page=${offset}&limit=${PAGE_SIZE_LIMIT}`,
    body,
    {
      headers: {
        accept: 'application/json',
        authorization: `${request.token?.token_type} ${request.token?.access_token}`,
      }
    },
  )
);
}
const results = await Promise.allSettled(requests);

const list: any = [];
let isPartialFailed = false;
results.forEach((result) => {
if (result.status === 'fulfilled') {
  const quotesListData = result.value?.data?.quotes;
  if (Array.isArray(quotesListData)) {
    list.push(...quotesListData);
  }
} else {
  isPartialFailed = true;
}
});
const { heapUsed } = process.memoryUsage();

if (process.send) {
  process.send({
  key: request.url,
  list,
  url: request.url,
  testId,
  isPartialFailed,
  kill: heapUsed > maxRAM * 1024 * 1024,
 });
}
});

实施:

import path from 'path';
import ForkBalancer from './forkBalancer';

const forkBalancer = new ForkBalancer({
 path: path.resolve(__dirname, './ParallelAPI'),
});

const handler = async (req, res) => {
  const { body } = req.body;
  const { testId } = req.query;
  const response = await forkBalancer.getFromRenderer({
    request: { 
      token: request.token,
      url: request.url 
    },
    testId,
    PAGE_SIZE_LIMIT: 50,
    body,
    totalPages: 100
  });
  return res.send(response);
 }

 fastify.post('/getAllItems', handler);
javascript node.js child-process fastify
1个回答
0
投票

为了缓解延迟问题,我尝试使用 NodeJS 的 Fork 功能将并行调用的逻辑卸载到子进程。正如预期的那样,这减少了延迟。

我不知道如何...您刚刚在所有这些之上添加了另一层抽象。在您的用例中,我认为分叉的原因为零。您不受 CPU 限制。您正在等待网络请求成功。

但是,我当前面临的问题是,当我的服务器有大量并行请求时,大约 60% 的请求收到了错误的响应。

某些上游服务器可能会限制您的请求,是吗?除非有计划,否则每秒数千次 API 调用通常会让您受到限制。

© www.soinside.com 2019 - 2024. All rights reserved.