修复动态、自我调整的优先级队列

问题描述 投票:0回答:1

我正在尝试在 JavaScript 中实现一个动态的、自我调整的优先级队列,该队列与异步任务一起运行。目标是创建一个优先级队列,可以随时添加具有给定优先级的任务。然而,复杂性来自于任务优先级可以根据外部异步事件动态改变的要求。

具体要求如下:

动态任务添加:队列应允许随时添加具有指定优先级的任务(功能对象)。

异步任务执行: 任务应使用 async/await 异步执行。

优先级调整:应该有一种方法可以根据外部异步触发器(例如从API获取数据)来调整队列中已有任务的优先级。队列应根据新的优先级自动重新排序任务。

并发控制:系统应同时处理最多N个任务,其中N是可配置参数。

事件处理:系统应在每个任务完成后发出一个事件并优雅地处理错误,而不停止队列处理。

任务取消:提供取消排队任务的机制。

我已经做了一些初步研究,并了解到这可能涉及异步/等待、Promises 的组合,也许还涉及某种形式的事件处理或反应式编程,但我正在努力弄清楚如何以一种可扩展且高效的方式。

有人可以为这样的系统提供指导或参考实现吗?任何有关相关模式或库的帮助或指示将不胜感激。

class AsyncPriorityQueue {
    constructor() {
        this.queue = [];
        this.processing = false;
    }

    // Add task to the queue with a given priority
    add(task, priority) {
        this.queue.push({ task, priority });
        this.queue.sort((a, b) => a.priority - b.priority);
        this.processQueue();
    }

    // Process the queue
    async processQueue() {
        if (this.processing || this.queue.length === 0) {
            return;
        }
        this.processing = true;

        const { task } = this.queue.shift();
        try {
            await task();
        } catch (error) {
            console.error("Task failed:", error);
        }

        this.processing = false;
        this.processQueue();
    }

    
    adjustPriority(taskId, newPriority) {}
}

// Usage Example
const myQueue = new AsyncPriorityQueue();

// Example tasks
const task1 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 1'));
const task2 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 2'));

myQueue.add(task1, 2);
myQueue.add(task2, 1);

javascript asynchronous async-await
1个回答
0
投票

该实现通过允许并行处理多个任务(最多达到并发设置的限制)来解决并发控制问题,动态调整任务优先级,并提供取消任务的方法。队列始终在新任务出队之前进行排序,确保按正确的优先级顺序处理任务。

class FixedAsyncPriorityQueue {
    constructor(concurrency = 1) {
        this.queue = [];
        this.concurrency = concurrency;
        this.currentTasks = new Set();
        this.taskCounter = 0;
    }

    // Add task to the queue with a given priority
    add(task, priority) {
        const taskId = this.taskCounter++;
        this.queue.push({ task, taskId, priority });
        this.processQueue();
        return taskId;
    }

    // Re-sort the queue when priorities change
    sortQueue() {
        this.queue.sort((a, b) => a.priority - b.priority);
    }

    // Process the queue with concurrency control
    async processQueue() {
        while (this.queue.length > 0 && this.currentTasks.size < this.concurrency) {
            this.sortQueue();
            const { task, taskId } = this.queue.shift();
            this.currentTasks.add(taskId);

            task().then(() => {
                this.currentTasks.delete(taskId);
                this.processQueue();
            }).catch(error => {
                console.error(`Task ${taskId} failed:`, error);
                this.currentTasks.delete(taskId);
                this.processQueue();
            });
        }
    }

    // Adjust the priority of a specific task
    adjustPriority(taskId, newPriority) {
        const taskIndex = this.queue.findIndex(task => task.taskId === taskId);
        if (taskIndex > -1) {
            this.queue[taskIndex].priority = newPriority;
            this.sortQueue();
        }
    }

    // Cancel a task
    cancelTask(taskId) {
        const taskIndex = this.queue.findIndex(task => task.taskId === taskId);
        if (taskIndex > -1) {
            this.queue.splice(taskIndex, 1);
        }
    }
}

// Usage example
const myQueue = new FixedAsyncPriorityQueue(2); // Set concurrency to 2

// Example tasks
const task1 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 1'));
const task2 = () => new Promise(resolve => setTimeout(resolve, 1000, 'Task 2'));

const taskId1 = myQueue.add(task1, 2);
const taskId2 = myQueue.add(task2, 1);

© www.soinside.com 2019 - 2024. All rights reserved.