asyncio 尝试获取锁而不等待它

问题描述 投票:0回答:2

我正在将一些线程代码转换为异步代码。

在线程代码中,我调用 threading.RLock.acquire(blocking = False, timeout = 0 )

似乎没有办法在不等待的情况下尝试获取 asyncio.Lock 。有没有办法做到这一点,如果是的话,我错过了什么?

如果有帮助,这是我的辅助函数:

@contextlib.contextmanager
def try_acquire_lock ( lock: gevent.lock.RLock ) -> Iterator[bool]:
    try:
        locked: bool = lock.acquire ( blocking = False, timeout = 0 )
        yield locked
    finally:
        if locked:
            lock.release()

这是我如何使用它的示例:

    def generate( self, cti: Freeswitch_acd_api ) -> bool:
        log = logger.getChild( 'Cached_data._generate' )
        if self.data_expiration and tz.utcnow() < self.data_expiration:
            log.debug( f'{type(self).__name__} data not expired yet' )
        else:
            with try_acquire_lock( self.lock ) as locked:
                if locked:
                    log.debug( f'{type(self).__name__} regenerating' )
                    try:
                        new_data = self._generate( cti )
                    except Freeswitch_error as e:
                        log.exception( 'FS error trying to generate data: %r', e )
                        return False
                    else:
                        self.data = new_data
                        self.data_expiration = tz.utcnow() + tz.timedelta( seconds=self.max_cache_seconds )
        return True

因为有人肯定会问“你为什么要这样做”,这是因为在某些情况下我有 3 个不同的线程(现在是任务),每个线程都连接到不同的服务器。这些任务负责使用来自每个服务器的信息来更新状态。我可以从任何一台服务器获取一些“全局”信息。如果一个任务已经在更新该全局信息,我不希望另一个任务重复该工作,因此我使用锁来控制当前谁正在执行该过程。我需要能够从所有服务器获取信息的原因是因为有时会删除一台服务器进行维护,这是我能想到的最简单、最万无一失的方法来实现它,而无需创建与服务器的额外连接。

python multithreading locking python-asyncio
2个回答
0
投票

您可以尝试使用

Lock.locked()
方法,该方法返回一个布尔值,指示当前是否持有锁。您可以使用此方法检查当前是否由其他任务持有锁,如果是,您可以选择跳过需要锁的代码或采取其他操作。

如何使用

Lock.locked()
尝试在不等待的情况下获取锁的示例:

import asyncio


async def main():
    lock = asyncio.Lock()

    if not lock.locked():
        # Lock is not held by another task, so we can acquire it
        async with lock:
            print("Acquired lock")
            # Do some work that requires the lock here
    else:
        # Lock is already held by another task, so we cannot acquire it
        print("Lock is held by another task, skipping code that requires the lock")

asyncio.run(main())

或者,您可以使用

Lock.acquire()
方法并向其传递
blocking=False
参数来尝试获取锁而无需等待。如果成功获取锁,则返回
True
;如果未成功获取锁,则返回
False

    import asyncio
    
    async def main():
    lock = asyncio.Lock()
    
    if await lock.acquire(blocking=False):
        # Lock was acquired successfully
        print("Acquired lock")
        # Do some work that requires the lock here
        lock.release()
    else:
        # Lock is already held by another task, so we cannot acquire it
        print("Lock is held by another task, skipping code that requires the lock")

asyncio.run(main())

告诉我是否有效。


0
投票

我相信 Sergio 建议的使用

Lock.locked()
的方法是正确的,只要你立即尝试获取锁,即沿着

if not lock.locked():
    await lock.acquire()

原因是,在 asyncio 中,代码在单个事件循环中运行,上下文切换发生在显式的 await 点。而且因为

lock.locked()
不会等待任何东西,并且
acquire
紧随其后,其他任务甚至没有机会干扰(与传统线程不同,传统线程可能会发生这种情况)。

© www.soinside.com 2019 - 2024. All rights reserved.