我有一个Django项目,这个项目有很多自定义的管理命令,这些命令会从crontab执行。 我需要在这些命令中实现锁定,以防止cron启动多个实例的命令执行。 所以我正在检查我的每一个自定义管理命令,并改变。
def handle(self, *args, **options):
...
改为:
from lockmgr.lockmgr import LockMgr, Locked
...
def run_handle(self, *args, **options):
''' I renamed handle() to run_handle(). '''
...
def handle(self, *args, **options):
command = sys.argv[1]
try:
with LockMgr(f'{command}_lock') as l:
self.run_handle(*args, **options)
except Locked as e:
errmsg = f'{command} failed to lock. Reason: {e}.'
self.stdout.write(errmsg)
self.logger.error(errmsg)
然后我开始想,我可能应该在这里使用一个 "装饰器",但我不太确定如何做到这一点。 任何帮助将是巨大的。
谢谢。
啊,想通了,我需要创建一个python文件。 我需要创建一个python文件 ...
myprojectdecoratorslock_cron_job.py。
from lockmgr.lockmgr import LockMgr, Locked
import logging
import sys
def lock_cron_job():
"""Decorator that uses LockMgr."""
def decorator(handle_func):
def wrapped(*args, **kwargs):
command = sys.argv[1]
log = logging.getLogger(command)
try:
with LockMgr(f'{command}_lock'):
handle_func(*args, **kwargs)
except Locked as e:
errmsg = f'{command} failed to lock. Reason: {e}'
log.error(errmsg)
return wrapped
return decorator
然后在我的自定义管理命令,我会这样做。
from myproject.decorators.lock_cron_job import lock_cron_job
...
@lock_cron_job()
def handle(self, *args, **options):
...