我正在尝试在 JavaScript 中实现一个动态的、自我调整的优先级队列,该队列与异步任务一起运行。目标是创建一个优先级队列,可以随时添加具有给定优先级的任务。然而,复杂性来自于任务优先级可以根据外部异步事件动态改变的要求。
具体要求如下:
动态任务添加:队列应允许随时添加具有指定优先级的任务(功能对象)。
异步任务执行: 任务应使用 async/await 异步执行。
优先级调整:应该有一种方法可以根据外部异步触发器(例如从API获取数据)来调整队列中已有任务的优先级。队列应根据新的优先级自动重新排序任务。
并发控制:系统应同时处理最多N个任务,其中N是可配置参数。
事件处理:系统应在每个任务完成后发出一个事件并优雅地处理错误,而不停止队列处理。
任务取消:提供取消排队任务的机制。
我已经做了一些初步研究,并了解到这可能涉及异步/等待、Promises 的组合,也许还涉及某种形式的事件处理或反应式编程,但我正在努力弄清楚如何以一种可扩展且高效的方式。
有人可以为这样的系统提供指导或参考实现吗?任何有关相关模式或库的帮助或指示将不胜感激。
class AsyncPriorityQueue {
constructor() {
this.queue = [];
this.processing = false;
}
// Add task to the queue with a given priority
add(task, priority) {
this.queue.push({ task, priority });
this.queue.sort((a, b) => a.priority - b.priority);
this.processQueue();
}
// Process the queue
async processQueue() {
if (this.processing || this.queue.length === 0) {
return;
}
this.processing = true;
const { task } = this.queue.shift();
try {
await task();
} catch (error) {
console.error("Task failed:", error);
}
this.processing = false;
this.processQueue();
}
adjustPriority(taskId, newPriority) {}
}
// Usage Example
const myQueue = new AsyncPriorityQueue();
// Example tasks
const task1 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 1'));
const task2 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 2'));
myQueue.add(task1, 2);
myQueue.add(task2, 1);
该实现通过允许并行处理多个任务(最多达到并发设置的限制)来解决并发控制问题,动态调整任务优先级,并提供取消任务的方法。队列始终在新任务出队之前进行排序,确保按正确的优先级顺序处理任务。
class FixedAsyncPriorityQueue {
constructor(concurrency = 1) {
this.queue = [];
this.concurrency = concurrency;
this.currentTasks = new Set();
this.taskCounter = 0;
}
// Add task to the queue with a given priority
add(task, priority) {
const taskId = this.taskCounter++;
this.queue.push({ task, taskId, priority });
this.processQueue();
return taskId;
}
// Re-sort the queue when priorities change
sortQueue() {
this.queue.sort((a, b) => a.priority - b.priority);
}
// Process the queue with concurrency control
async processQueue() {
while (this.queue.length > 0 && this.currentTasks.size < this.concurrency) {
this.sortQueue();
const { task, taskId } = this.queue.shift();
this.currentTasks.add(taskId);
task().then(() => {
this.currentTasks.delete(taskId);
this.processQueue();
}).catch(error => {
console.error(`Task ${taskId} failed:`, error);
this.currentTasks.delete(taskId);
this.processQueue();
});
}
}
// Adjust the priority of a specific task
adjustPriority(taskId, newPriority) {
const taskIndex = this.queue.findIndex(task => task.taskId === taskId);
if (taskIndex > -1) {
this.queue[taskIndex].priority = newPriority;
this.sortQueue();
}
}
// Cancel a task
cancelTask(taskId) {
const taskIndex = this.queue.findIndex(task => task.taskId === taskId);
if (taskIndex > -1) {
this.queue.splice(taskIndex, 1);
}
}
}
// Usage example
const myQueue = new FixedAsyncPriorityQueue(2); // Set concurrency to 2
// Example tasks
const task1 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 1'));
const task2 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 2'));
const taskId1 = myQueue.add(task1, 2);
const taskId2 = myQueue.add(task2, 1);