如何在SIGTERM上打破Async系列?

问题描述 投票:3回答:3

假设我有以下情况 -

async.series(
  [
    function (cbi) {
      students.getAll('student', function (err, response) {
        if (err) {
          logger.error(err);
        }
        cbi(err, response);
      });
    },
    function (cbi) {
      students.deleteAll('student', function (err, response) {
        if (err) {
          logger.error(err);
        }
        cbi(err, response);
      });
    },
    function (cbi) {
      teachers.getAll('teacher', function (err, response) {
        if (err) {
          logger.error(err);
        }
        cbi(err, response);
      });
    },
    function (cbi) {
      teachers.deleteAll('teacher', function (err, response) {
        if (err) {
          logger.error(err);
        }
        cbi(err, response);
      });
    };
  ]
);

SIGTERM被发送时,我想要一个优雅的清理。这是一个清理所有学生或所有教师,无论哪一个正在进行信号发送完成,下一个不应该开始。

function (cbi) {
  students.getAll('student', function (err, response) {
    if (err || GLOBAL_VAR_SIGTERM === true) {
      logger.error(err);
    }
    cbi(err, response);
  });
}

我在想我应该设置一个全局变量来跟踪SIGTERM信号。

process.on('SIGTERM', function onSigterm () {
  GLOBAL_VAR_SIGTERM = true;
}

有没有更好的方法打破异步系列打破SIGTERM信号?

javascript node.js async.js sigterm
3个回答
3
投票

正如@adamrights在his answer中指出的那样,你的代码中的主要问题是你没有使用真正的cbi(err, response)第一个参数调用err,这对于阻止async.series继续执行队列中的下一个任务至关重要。

现在您的代码应该可以工作,但您的代码中有一个重复的模式:

function (cbi) {
  students.getAll('student', function (err, response) {
    // these 3 lines appear in every callback function
    if (GLOBAL_VAR_SIGTERM) err = new Error("SIGTERM: Aborting remaining tasks");
    if (err) logger.error(err);
    cbi(err, response);
    // end of repeat pattern
  });
}

传递给每个异步任务的回调总是做同样的3-liner事情。我们知道DRY规则,将重复模式提取到另一个函数以尽可能多地重用它总是一个好主意。

因此,您应该声明一个工厂函数,而不是重复声明匿名函数。

function callbackFactory(cbi) {
  return function(err, response) {
    if (GLOBAL_VAR_SIGTERM) err = new Error("SIGTERM: Aborting remaining tasks");   
    if (err) logger.error(err);
    cbi(err, response);
  }
}

// use arrow function to write more concise code
async.series(
  [
    cbi => students.getAll('student', callbackFactory(cbi)),
    cbi => students.deleteAll('student', callbackFactory(cbi)),
    cbi => teachers.getAll('teacher', callbackFactory(cbi)),
    cbi => teachers.deleteAll('teacher', callbackFactory(cbi)),
  ]
);

高级主题:使用装饰器来处理横切问题

让我们再探讨一下这个话题。显然,在接受SIGTERM之前中止是一个跨领域的问题,应该与业务逻辑分开。假设您的业务逻辑因任务而异:

async.series(
  [
    cbi => students.getAll('student', (err, response) => {
      if (err) {
        logger.error(err);
        return cbi(err);
      }
      updateStudentCount(response.data.length)  // <- extra work
      cbi(err, response);
    }),
    cbi => teachers.getAll('student', (err, response) => {
      if (err) {
        logger.error(err);
        return cbi(err);
      }
      updateTeacherCount(response.data.length)  // <- different extra work
      cbi(err, response);
    })
  ]
);

因为回调是变化的,所以很难将其提取到像以前一样的工厂函数中。从这个角度来看,我们最好将中止早期行为注入每个任务,让编写正常的业务逻辑变得容易。

这是装饰模式派上用场的地方。但是全局变量不是实现它的最佳工具,我们将使用事件监听器。

装饰器的基本界面如下所示:

// `task` will be things like `cbi => students.getAll('student', ... )`
function decorateTaskAbortEarly(task) {
  return (originalCbi) => {
    ...
    task(originalCbi)
  }
}

以下是我们的实施清单:

  • 如果我们收到originalCbi,我们将打电话给SIGTERM
  • 但是当我们没有收到SIGTERM时,originalCbi仍可在任何异步任务的回调中调用,如正常
  • 如果曾经调用originalCbi,我们应该取消订阅SIGTERM以防止内存泄漏

实施:

function decorateTaskAbortEarly(task) {
  return (originalCbi) => {
    // subscribe to `SIGTERM`
    var listener = () => originalCbi(new Error("SIGTERM: Aborting remaining tasks"));
    process.once('SIGTERM', listener);

    var wrappedCbi = (err, response) => {
      // unsubscribe if `cbi` is called once
      process.off('SIGTERM', listener);
      return originalCbi(err, response);
    };
    // pass `cbi` through to `task`
    task(wrappedCbi);
  }
}

// Usage:
async.series(
  [
    cbi => students.getAll('student', (err, response) => {
      if (err) {
        logger.error(err);
        return cbi(err);
      }
      updateStudentCount(response.data.length)
      cbi(err, response);
    }),
    cbi => teachers.getAll('student', (err, response) => {
      if (err) {
        logger.error(err);
        return cbi(err);
      }
      updateTeacherCount(response.data.length)
      cbi(err, response);
    })
  ].map(decorateTaskAbortEarly)  // <--- nice API
);

3
投票

如果你想在SIGTERM中回应async.series()事件,那么你是正确的,最简单的方法是跟踪全局变量。

但是你需要将cbi(err, response)函数中的第一个参数(错误优先回调)设置为true以便打破系列。

所以:

if (err || GLOBAL_VAR_SIGTERM === true) {
  logger.error(err);
}

应该更像是:

if (err) logger.error(err);
if (GLOBAL_VAR_SIGTERM) err = new Error("SIGTERM: Aborting remaining tasks");
// You could just do err = true
// But best practice is to use an Error instance.

然后因为cbi(err, response)将被调用err值等于true,剩下的任务将不会被运行。


1
投票

我喜欢其他答案。这是实现同样目标的另一种方式。我正在使用我自己的例子:

var async = require('async');
var ifAsync = require('if-async')
var GLOBAL_VAR_SIGTERM = false;

async.series({
    one: ifAsync(notsigterm).then(function (callback) {
        setTimeout(function () {
            console.log('one');
            callback(null, 1);
        }, 1000);
    }),
    two: ifAsync(notsigterm).then(function (callback) {
        setTimeout(function () {
            console.log('two');
            callback(null, 2);
        }, 1000);
    }),
    three: ifAsync(notsigterm).then(function (callback) {
        setTimeout(function () {
            console.log('three');
            callback(null, 3);
        }, 1000);
    }),
    four: ifAsync(notsigterm).then(function (callback) {
        setTimeout(function () {
            console.log('four');
            callback(null, 4);
        }, 1000);
    }),
}, function (err, results) {
    if (err) {
        //Handle the error in some way. Here we simply throw it
        //Other options: pass it on to an outer callback, log it etc.
        throw err;
    }
    console.log('Results are ' + JSON.stringify(results));
});

process.on('SIGTERM', function onSigterm () {
    console.log('SIGTERM caught');

  GLOBAL_VAR_SIGTERM = true;
});

function notsigterm(callback) {
    if (!GLOBAL_VAR_SIGTERM) return callback(null, true)
    else return callback(null, false)
}

我正在使用一个名为ifAsync的包,它允许您使用谓词notsigterm来决定是否应该调用回调。如果notsigterm返回true,那么将调用回调,否则将跳过它。这是与其他人类似的答案,但不知怎的,我觉得这个更干净。如果您有疑问,请告诉我。

© www.soinside.com 2019 - 2024. All rights reserved.