带有等待功能的 JavaScript 简单任务运行器

问题描述 投票:0回答:7

我想实现类似任务运行程序的东西,它将被推送新任务。这些任务中的每一个都可以是一些异步操作,例如等待用户或进行 API 调用或其他操作。任务运行程序确保一次只能执行允许数量的任务,而其他任务将继续等待,直到轮到它们。

class Runner {
  constructor(concurrent) {
    this.taskQueue = []; //this should have "concurrent" number of tasks running at any given time

  }

  push(task) {
    /* pushes to the queue and then runs the whole queue */
  }
}

调用模式是

let runner = new Runner(3);
runner.push(task1);
runner.push(task2);
runner.push(task3);
runner.push(task4);

其中任务是一个函数引用,它将在最后运行一个回调,我们可以知道它已经完成。所以应该是这样的

let task = function(callback) {
  /* does something which is waiting on IO or network or something else*/
  callback(); 
}

所以我正在向跑步者推出类似

的闭包
runner.push(function(){return task(callback);});

我想我可能还需要添加一个 waitList 队列。但任务本身并不是承诺,所以我不知道如何检查这些任务是否完成。

无论如何,我需要正确的方法。

javascript ecmascript-6 async-await es6-promise
7个回答
7
投票

概念的简单演示。更改了变量名称以便更好地理解。

class Runner {
  constructor(concurrency = 1) {
    this.concurrency = concurrency;
    this.waitList = [];
    this.count = 0;
    this.currentQ = [];
  }

  push(task) {
    this.waitList.push(task);
    this.run();
  }

  run() {
    let me = this;
    if (this.count < this.concurrency) {
      this.count++;
      if (this.waitList.length > 0) {
        let task = this.waitList.shift();
        let id = task.id;
        this.currentQ.push(id);
        this.showQ();
        task.task(function() {
          this.currentQ.splice(this.currentQ.indexOf(id), 1);
          this.showQ();
          this.count--;
          this.run();
        }.bind(me))
      }
    }
  }

  showQ() {
    let q = "";
    q = this.currentQ.join(', ');
    document.getElementById("running").innerHTML = q;
  }
}

let output = document.getElementById("output");

let task1 = {
  id: 1,
  task: function(done) {
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task1");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task1");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 3000)
  }
}
let task2 = {
  id: 2,
  task: function(done) {
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task2");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task2");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 6000)
  }
}
let task3 = {
  id: 3,
  task: function(done) {
    this.id = "3";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task3");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task3");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 10000)
  }
}
let task4 = {
  id: 4,
  task: function(done) {
    this.id = "4";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task4");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task4");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 5000)
  }
}
let task5 = {
  id: 5,
  task: function(done) {
    this.id = "5";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task5");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task5");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 6000)
  }
}
let task6 = {
  id: 6,
  task: function(done) {
    this.id = "6";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task6");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task6");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 4000)
  }
}
let task7 = {
  id: 7,
  task: function(done) {
    this.id = "7";
    let div = document.createElement("div");
    let node = document.createTextNode("Picking up Task7");
    div.appendChild(node);
    output.appendChild(div);
    setTimeout(function() {
      div = document.createElement("div");
      node = document.createTextNode("Finished Task7");
      div.appendChild(node);
      output.appendChild(div);
      done()
    }, 5000)
  }
}

let r = new Runner(3);
r.push(task1);
r.push(task2);
r.push(task3);
r.push(task4);
r.push(task5);
r.push(task6);
r.push(task7);
Currently running
<div id="running">

</div>
<hr>
<div id="output">

</div>


2
投票

所以我正在向跑步者推出类似

的闭包
runner.push(function(){return task(callback);});

看起来运行程序的缺失部分被添加到调用语法中。一个更完整的跑步者可能看起来像:

class Runner {
  constructor(concurrent) {
    this.taskQueue = []; // run at most "concurrent" number of tasks at once
    this.runCount = 0;
    this.maxCount = concurrent;
    this.notifyEnd = this.notifyEnd.bind(this);
  }
  
  notifyEnd() {
    --this.runCount;
    this.run();
  }

  run() {
    while( (this.runCount < this.maxCount) && taskQueue.length) {
      ++this.runCount;
      // call task with callback bound to this instance (in the constructor)
      taskQueue.shift()(this.notifyEnd);
    } 
  }

  push(task) {
    this.taskQueue.push(task);
    this.run();
  }
}

现在,使用带有回调参数的函数来调用跑步者的

push
方法。运行器状态包含在
runCount
的值中,0 表示空闲,或者正整数表示任务正在运行。

还有几个问题:

  1. 可以同步调用任务以将其添加到运行器的代码。它缺乏 Promises 始终从事件队列异步调用

    then
    回调的严格方法。

  2. 任务代码必须正常返回,没有错误。这在 JavaScript 中并非闻所未闻,其中未捕获的承诺拒绝错误的主机跟踪器必须执行相同的操作,但在应用程序脚本中相当不寻常。运行程序对任务的调用可以放置在

    try/catch
    块中以捕获同步错误,但如果在任务引发同步错误之前收到回调,则还应该添加代码以忽略错误 - 否则正在运行的任务计数可能会出错.

  3. 如果任务多次调用回调,上面的运行器中正在运行的任务计数将会被打乱。

Promise 接口的开发和标准化背后也有类似的考虑。我建议在考虑到潜在的缺点后,如果一个简单的任务运行程序满足所有要求,那么就使用一个。如果需要额外的稳健性,那么承诺任务并编写一个更加以承诺为中心的运行程序可能是更好的选择。


2
投票

将任务定义为承诺(更具体地说,承诺返回函数)是合理的,因为这对它们来说是一个很好的用例;目前错误无法处理(如果没有 Promise,它们可以按照惯例使用 Node 风格的回调来处理)。即使它们不是承诺,承诺也可以在内部使用:

  constructor(concurrent = 1) {
    this.concurrent = concurrent;
    this.taskQueue = [];
  }

  push(task) {
    this.taskQueue.push(task);
  }

  run() {
    let tasksPromise = Promise.resolve();
    for (let i = 0; i < this.taskQueue.length; i += this.concurrent) {
      const taskChunk = this.taskQueue.slice(i, i + this.concurrent));
      const taskChunkPromises = taskChunk.map(task => new Promise(resolve => task(resolve)));
      tasksPromise = tasksPromise.then(() => Promise.all(taskChunkPromises));
    }

    return tasksPromise;
  }

async..await
在这种情况下可以提供好处:

  async run() {
    for (let i = 0; i < this.taskQueue.length; i += this.concurrent) {
      const taskChunk = this.taskQueue.slice(i, i + this.concurrent));
      const taskChunkPromises = taskChunk.map(task => new Promise(resolve => task(resolve)));
      await Promise.all(taskChunkPromises);
    }
  }

1
投票

所以我正在向跑步者推出类似

的闭包
runner.push(function(){return task(callback);});

您是否可以将 taskcallback 函数指定为 push 函数中的单独参数?如果是的话,你可能可以做这样的事情。

class Runner {
  constructor(maxCount = 1) {
    this.taskQueue = [];
    this.maxCount = maxCount;
    this.currentCount = 0;
  }

  run() {
    if (this.taskQueue.length && this.currentCount < this.maxCount) {
      const task = this.taskQueue.shift();
      task();
    }
  }

  push(task, callback) {
    this.taskQueue.push(() => {
      this.currentCount++;
      task((...args) => {
        this.currentCount--;
        callback(...args);
        this.run();
      })
    })
    this.run();
  }
}

// Example usage
const myCallback = (caller) => {
  console.log(`myCallback called by ${caller} ${new Date()}`);
};

const task1 = (callback) => {
  console.log(`task1 started ${new Date()}`);
  setTimeout(() => {
    callback('task1');
  }, 3000);
};

const task2 = (callback) => {
  console.log(`task2 started ${new Date()}`);
  setTimeout(() => {
    callback('task2');
  }, 3000);
};

const task3 = (callback) => {
  console.log(`task3 started ${new Date()}`);
  setTimeout(() => {
    callback('task3');
  }, 3000);
};

const task4 = (callback) => {
  console.log(`task4 started ${new Date()}`);
  setTimeout(() => {
    callback('task4');
  }, 3000);
};

const runner = new Runner(2);
runner.push(task1, myCallback);
runner.push(task2, myCallback);
runner.push(task3, myCallback);
runner.push(task4, myCallback);


0
投票

回调样式:

run () {
    var task = this.taskQueue.unshift();
    task(() => this.run());
}

0
投票

有趣的问题。所以我尝试为 JS 实现一个非常简单的异步任务运行器。我相信在发送大量电子邮件等时这些是必不可少的。然而,这个特别演示了 Fetch API 上的工作,我相信它可以轻松地实现任何异步工作。

这里我有一个构造函数,它为我们提供了一个异步任务运行程序的实例,在该实例中我们将同时运行给定数量的异步任务作为一个块,并等待给定的时间段继续下一个块,直到我们用完为止

taskQueue
中的所有任务。但与此同时,我们仍然可以推送新任务,并且它将继续以块的形式调用任务,包括新添加的任务。在整个过程中,我们也可以自由修改正在处理的每个块的间隔。

我在这里没有实现的是正确的错误处理,除了

.catch(console.log)
尝试n次然后失败机制。这可以简单地从我之前的答案之一来实现。

当我们提供异步任务时,我们当然需要连续的

.then(
)阶段来完成工作。在我的抽象中,它们作为待办事项函数在数组中提供。假设您总共要做 20 次
fetch
,例如;

var url     = "https://jsonplaceholder.typicode.com/posts/",
    fetches = Array(20).fill().map((_,i) => () => fetch(`${url+(i+1)}`));

那么你可以提供一个

todo
数组作为;

var todos   = [resp => resp.json(), json => console.log(json)];

其中每个项目都是前面提到的后续

.then()
阶段的回调。以下代码最初以 1000 毫秒的间隔以 3 个块的形式运行任务,但 2 秒后切换为 500 毫秒的间隔。

function TaskRunner(tasks = [],
                    todos = [],
                    group = 1,
                    interval = 1000){

  this.interval   = interval;
  this.concurrent = group;
  this.taskQueue  = tasks;
  this.todos      = todos;
}

TaskRunner.prototype.enqueue = function(ts = []){
                                 var cps; // current promises
                                 this.taskQueue = this.taskQueue.concat(ts);
                                 cps = this.taskQueue.splice(0,this.concurrent)
                                                     .map(t => this.todos.reduce((p,td) => p.then(td), t())
                                                                         .catch(console.log));
                                 this.taskQueue.length && setTimeout(this.enqueue.bind(this), this.interval);
                               };

var url     = "https://jsonplaceholder.typicode.com/posts/",
    fetches = Array(20).fill().map((_,i) => () => fetch(`${url+(i+1)}`)),
    todos   = [resp => resp.json(), json => console.log(json)],
    goFetch = new TaskRunner();

goFetch.todos.push(...todos);
goFetch.concurrent = 2;
goFetch.enqueue(fetches);
setTimeout(() => goFetch.interval = 500, 2000);
.as-console-wrapper {
max-height: 100% !important
}


0
投票

class TaskRunner {
  tasks = [];
  isThreadFree = true;

  enqueue(task, cb) {
    this.tasks.push({
      task: task,
      callBack: cb,
    });
    this.checkIfTaskCanbeRun();
  }

  checkIfTaskCanbeRun() {
    if (this.isThreadFree && this.tasks.length != 0) {
      const taskToRun = this.tasks[0];
      this.tasks = this.tasks.splice(1);
      this.runTask(taskToRun);
    } else {
      console.warn("wait for thread to get free");
    }
  }

  runTask(taskToRun) {
    this.isThreadFree = false;
    const p = new Promise(taskToRun.task);

    p.then((data) => {
        taskToRun.callBack({
          data
        });
      })
      .catch((error) => {
        taskToRun.callBack({
          error
        });
      })
      .finally(() => {
        this.isThreadFree = true;
        this.checkIfTaskCanbeRun();
      });
  }
}
const runner = new TaskRunner();

const task1 = (pass) => {
  setTimeout(() => {
    pass("task 1 is successfully completed.");
  }, 3000);
};
const task2 = (pass, fail) => {
  setTimeout(() => {
    fail("task 2 is failed.");
  }, 5000);
};

const callback = ({
  data,
  error
}) => {
  console.log({
    data,
    error
  });
};

runner.enqueue(task1, callback);
runner.enqueue(task2, callback);

© www.soinside.com 2019 - 2024. All rights reserved.