A 有一个有多个进程的烧瓶服务器。这个想法是有一些工作要做,并且工作通过 websockets 发送到工作进程。工作保存在 redis 中,看起来像这样:
JOB_QUE:JOBID : {
work: [ { work_id:..., files: [ { FilePath:... }, ... ] } ]
}
每个工作实例都包含一个数组,其中包含许多正在处理的文件。 这是我在将工作传递给工作人员时尝试访问 redis 数据的方式。
for key in redis.scan_iter("JOB_QUE:*"):
Job = json.loads(redis.get(key))
if Job is not None:
for work in Job['work']:
#assign this work to this worker
if work['status'] == None:
lock_key = f"{key}:{work['work_id']}:lock"
with redis.pipeline() as pipe:
try:
# Watch the lock key for changes
pipe.watch(lock_key)
# Check if the lock is already acquired
lock_value = pipe.get(lock_key)
if lock_value is None:
# Lock is not acquired, set the lock and assign the work item
pipe.multi()
pipe.set(lock_key, 1)
if work is not None:
#Do assign work
current_time = datetime.now()
work['status'] = 'Pending'
emit('receiveWork', work,room=request.sid)
leave_room('Ready')
join_room('Working')
pipe.set(key, json.dumps(Job))
pipe.delete(lock_key)
pipe.execute()
break
else:
#This work is locked, keep looking
pass
except Exception as e:
#this key has become locked
pass
所以在这种情况下,我在父键
work
内的
JOB_QUE:XXX
数组的一个元素上加锁
但这不起作用。这个函数很可能会在大约同一时间运行 50 次,但同样的工作正在发送给 10 多个工人,锁表似乎不起作用。