为最大请求数/秒选择适当的异步方法进行批处理

问题描述 投票:0回答:1

我需要对某些外部API执行循环调用并有一些延迟,以防止“超出用户速率限制”限制。

Google Maps Geocoding API对'req / sec'敏感,允许10 req / sec。我应该为数百个联系人进行地理编码,并且需要这样的延迟。因此,我需要一个10个异步地理编码功能,每个延迟1秒。所以,我收集数组中的所有联系人,然后以异步方式循环遍历数组。

通常,我需要有N个并发线程,每个线程结束时D msecs延迟。整个循环遍历用户实体数组。像往常一样,每个线程处理单个实体。

我想有一个像这样的代码:

const N = 10;   # threads count
const D = 1000; # delay after each execution

var processUser = function(user, callback){ 
  someBusinessLogicProc(user, function(err) {
    setTimeout(function() {
      return callback(err);
    }, D);
  });      
 }

 var async = require('async') ;
 var people = new Array(900);

 async.batchMethod(people, processUser, N, finalCallback);

在这个伪代码中,batchMethod是我要求的方法。

node.js async.js node-async
1个回答
2
投票

延迟结果并不是你想要的。相反,您希望跟踪发送的内容以及发送的时间,因此只要您满足每秒请求的限制,就可以发送另一个请求。


这是一个函数的一般概念,它将控制每秒固定数量的请求的速率限制。这使用promises并要求你提供一个返回promise的请求函数(如果你现在不使用promises,你只需要将你的请求函数包装在一个promise中)。

// pass the following arguments:
//   array - array of values to iterate
//   requestsPerSec - max requests per second to send (integer)
//   maxInFlight - max number of requests in process at a time
//   fn - function to process an array value
//        function is passed array element as first argument
//        function returns a promise that is resolved/rejected when async operation is done
// Returns: promise that is resolved with an array of resolves values
//          or rejected with first error that occurs
function rateLimitMap(array, requestsPerSec, maxInFlight, fn) {
    return new Promise(function(resolve, reject) {
        var index = 0;
        var inFlightCntr = 0;
        var doneCntr = 0;
        var launchTimes = [];
        var results = new Array(array.length);

        // calculate num requests in last second
        function calcRequestsInLastSecond() {
            var now = Date.now();
            // look backwards in launchTimes to see how many were launched within the last second
            var cnt = 0;
            for (var i = launchTimes.length - 1; i >= 0; i--) {
                if (now - launchTimes[i] < 1000) {
                    ++cnt;
                } else {
                    break;
                }
            }
            return cnt;            
        }

        function runMore() {
            while (index < array.length && inFlightCntr < maxInFlight && calcRequestsInLastSecond() < requestsPerSec) {
                (function(i) {
                    ++inFlightCntr;
                    launchTimes.push(Date.now());
                    fn(array[i]).then(function(val) {
                        results[i] = val;
                        --inFlightCntr;
                        ++doneCntr;
                        runMore();
                    }, reject);
                })(index);
                ++index;
            }
            // see if we're done
            if (doneCntr === array.length) {
                resolve(results);
            } else if (launchTimes.length > requestsPerSec) {
                // calc how long we have to wait before sending more
                var delta = 1000 - (Date.now() - launchTimes[launchTimes.length - requestsPerSec]);
                if (delta > 0) {
                    setTimeout(runMore, delta);
                }

            }
        }
        runMore();
    });
}

用法示例:

rateLimitMap(inputArrayToProcess, 9, 20, myRequestFunc).then(function(results) {
    // process array of results here
}, function(err) {
    // process error here
});

这段代码背后的一般想法是这样的:

  1. 传入一个数组进行迭代
  2. 它返回一个promise,它的解析值是一个结果数组(按顺序)
  3. 您将最大数量的requestsPerSec传递给了
  4. 您在同一时间传递了最大数量的请求
  5. 您传递的函数将从正在迭代的数组中传递一个元素,并且必须返回一个promise
  6. 它在上次发送请求时保留一组时间戳。
  7. 要查看是否可以发送另一个请求,它会在数组中向后查看并计算在最后一秒发送的请求数。
  8. 如果该数字低于阈值,则发送另一个。
  9. 如果该数字符合阈值,那么它会计算您必须等待多长时间才能发送另一个并为该时间量设置计时器。
  10. 完成每个请求后,它会检查是否可以发送更多请求
  11. 如果任何请求拒绝其承诺,则返回的承诺立即拒绝。如果您不希望它在第一次出错时停止,则修改传入的函数以拒绝,但要解决某些值,您可以在以后处理结果时将其识别为失败的请求。

这是一个有效的模拟:https://jsfiddle.net/jfriend00/3gr0tq7k/

注意:如果传入的maxInFlight值高于requestsPerSec值,则此函数基本上只发送requestsPerSec请求,然后一秒钟后发送另一个requestsPerSec请求,因为这是保持在requestsPerSec边界下的最快方式。如果maxInFlight值与requestsPerSec相同或低于requestsPerSec,那么它将发送qazxswpoi然后当每个请求完成时,它将看到它是否可以发送另一个请求。

© www.soinside.com 2019 - 2024. All rights reserved.