NodeJS socketio 竞争条件

问题描述 投票:0回答:1

我正在使用套接字io进行缓存,我需要非常快速地更新我的值。 (20个同时请求),最后是一个要检查的get请求,但是,我的get返回更新完成之前的值,也就是说,我期望结果为20,而我的get返回10, 11,等等。我在 catch 函数中设置了超时,它起作用了,但性能不是很好。

我已经尝试过使用互斥量、信号量和锁,但没有成功。有人可以帮忙吗?

代码如下:

const { Server } = require("socket.io");
const io = new Server({});

var cache = {};

io.on('connection', (socket) => {
    socket.on('clear', () => {
        cache = {};
    });


    socket.on('addToQueue', async (data, cb) => {
        if (data.type == `get`) {
            // function that await a time
            await new Promise(resolve => setTimeout(() => resolve(true), 20))
            cb({ value: cache[data.key] })
        }

        if (data.type == `transaction`) {
            //performs a calculation on each request and edits the cache
        }
    });
});

(async () => {
    io.listen(6456);
    console.log(`Server listening at *:${PORT}`);
})()

避免竞争条件,而不是每次都进行不必要的超时。

javascript node.js caching socket.io race-condition
1个回答
0
投票

为了处理竞争条件问题并确保您的获取请求仅在所有更新完成后返回,您可以实现更健壮的同步机制。鉴于您已经尝试过互斥体、信号量和锁但没有成功,问题似乎可能在于这些工具的应用方式,而不是它们对任务的适用性。

一种有效的方法是在允许 get 请求继续之前使用 Promise 和计数器来跟踪所有事务操作的完成情况。此方法避免了任意超时,这可能既低效又不可靠。

这是代码的修订版本,它实现了基于承诺的基本同步机制:

const { Server } = require("socket.io");
const io = new Server({});
var cache = {};

// Counter to keep track of ongoing transactions
let ongoingTransactions = 0;
// Array to store resolve functions of promises waiting for transactions to complete
let waitingForCompletion = [];

io.on('connection', (socket) => {
    socket.on('clear', () => {
        cache = {};
        // Reset the counter and clear waiting list without resolving, as it's a reset operation
        ongoingTransactions = 0;
        waitingForCompletion = [];
    });

    socket.on('addToQueue', async (data, cb) => {
        if (data.type === 'get') {
            if (ongoingTransactions > 0) {
                // Wait for all transactions to complete
                await new Promise(resolve => waitingForCompletion.push(resolve));
            }
            cb({ value: cache[data.key] });
        } else if (data.type === 'transaction') {
            // Increment the counter at the start of a transaction
            ongoingTransactions++;
            try {
                // Perform the transaction...
                // This is where you'd modify the cache as per your transaction logic
                // For demonstration, let's simulate it with a timeout
                await new Promise(resolve => setTimeout(resolve, 20)); // Simulate async work

                // Decrement the counter when the transaction is complete
                ongoingTransactions--;
                if (ongoingTransactions === 0) {
                    // If there are no more ongoing transactions, resolve all waiting promises
                    waitingForCompletion.forEach(resolve => resolve());
                    waitingForCompletion = []; // Clear the waiting list
                }
            } catch (error) {
                // Handle any errors that occur during the transaction
                console.error("Error during transaction:", error);
                ongoingTransactions--;
                if (ongoingTransactions === 0) {
                    waitingForCompletion.forEach(resolve => resolve());
                    waitingForCompletion = [];
                }
            }
        }
    });
});

(async () => {
    const PORT = 6456;
    io.listen(PORT);
    console.log(`Server listening at *:${PORT}`);
})();

该解决方案使用计数器来跟踪正在进行的交易数量。当收到获取请求时,它会检查是否有任何事务正在进行。如果有,它会通过创建一个新的 Promise 并将其解析函数添加到数组中来等待。一旦事务完成,它就会递减计数器,如果没有更多事务正在进行,它会解析所有存储的承诺,允许获取请求继续并获取更新的缓存值。

这种方法确保只有在所有事务完成后才会继续获取请求,而不依赖于任意超时,从而提高性能和可靠性。

© www.soinside.com 2019 - 2024. All rights reserved.