我有多个线程构造一个参数化对象并在其上调用启动和停止。问题是,每个参数的底层服务只能在第一次启动时启动,并且只在最后一次停止时停止一次。我尝试了以下方法:
# /usr/bin/env python3
import multiprocessing.pool
import random
import threading
import time
import uuid
def log(args):
print(f"{threading.get_ident()}: {args}")
class Myobj:
lock = threading.Lock()
count = 0
def __init__(self, name: str):
self.name = name
def log(self, args):
log(f"{self.name}: {args}")
def start(self):
with self.lock:
self.count += 1
if self.count == 1:
self.value = uuid.uuid1()
self.log(f"starting {self.value}")
self.log(f"ref up {self.count}")
def stop(self):
with self.lock:
self.count -= 1
if self.count == 0:
self.log(f"stopping {self.value}")
self.log(f"ref down {self.count}")
def thread(i):
# references service with specific name
myobj = Myobj(f"name{i % 2}")
# only the first thread that gets here should start the service with that name
myobj.start()
# some_computation()
time.sleep(random.uniform(0, 2))
# only the last thread that gets here should stop the service with that name
myobj.stop()
size = 6
with multiprocessing.pool.ThreadPool(size) as tp:
tp.map(thread, range(size))
但是,计数器并不是唯一的:
140385706604224: name0: starting ab142d3c-2ebd-11ee-9d78-901b0e12b878
140385706604224: name0: ref up 1
140385698211520: name1: starting ab143264-2ebd-11ee-a67e-901b0e12b878
140385698211520: name1: ref up 1
140385689818816: name0: starting ab1435fc-2ebd-11ee-99f3-901b0e12b878
140385689818816: name0: ref up 1
140385681426112: name1: starting ab1439b2-2ebd-11ee-932c-901b0e12b878
140385681426112: name1: ref up 1
140385673033408: name0: starting ab143d36-2ebd-11ee-9959-901b0e12b878
140385673033408: name0: ref up 1
140385664640704: name1: starting ab1443f8-2ebd-11ee-a1b9-901b0e12b878
140385664640704: name1: ref up 1
140385673033408: name0: stopping ab143d36-2ebd-11ee-9959-901b0e12b878
140385673033408: name0: ref down 0
140385706604224: name0: stopping ab142d3c-2ebd-11ee-9d78-901b0e12b878
140385706604224: name0: ref down 0
140385689818816: name0: stopping ab1435fc-2ebd-11ee-99f3-901b0e12b878
140385689818816: name0: ref down 0
140385681426112: name1: stopping ab1439b2-2ebd-11ee-932c-901b0e12b878
140385681426112: name1: ref down 0
140385664640704: name1: stopping ab1443f8-2ebd-11ee-a1b9-901b0e12b878
140385664640704: name1: ref down 0
140385698211520: name1: stopping ab143264-2ebd-11ee-a67e-901b0e12b878
140385698211520: name1: ref down 0
理想情况下,我应该看到以
name0
和 name1
开始和结束的行打印一次,如下所示意味着服务 name0
和服务 name1
已启动,然后停止。 name0
和 name1
服务之间的顺序不相关。所以只输出如下所示:
140385706604224: name0: starting ab142d3c-2ebd-11ee-9d78-901b0e12b878
140385698211520: name1: starting ab143264-2ebd-11ee-a67e-901b0e12b878
140385681426112: name0: stopping ab142d3c-2ebd-11ee-9d78-901b0e12b878
140385664640704: name1: stopping ab143264-2ebd-11ee-a67e-901b0e12b878
什么是一个好的设计来实现这样的类,一次启动和停止具有多个线程的唯一命名服务?这可以作为泛型类实现吗?
只需让你的计数器成为一个可变对象,它就应该按照你想要的方式运行。
现在,你的类变量
count
是一个int
,一个不可变类型。如果这是一个可变类型,那么它将按照您期望的方式在实例之间共享其值。
class Foo:
x = 0
def incr(self):
self.x += 1
a = Foo()
b = Foo()
a.incr()
print(Foo.x, a.x, b.x) # change is not shared
但是如果您有自定义对象,则这有效:
class Counter:
def __init__(self, initial=0):
self._count = initial
@property
def count(self):
return self._count
def incr(self):
self._count += 1
return self.count
def decr(self):
self._count -= 1
return self.count
def __repr__(self):
return f'Counter({self.count!r})'
import multiprocessing.pool
import random
import threading
import time
import uuid
def log(args):
print(f"{threading.get_ident()}: {args}")
class Myobj:
lock = threading.Lock()
count = Counter()
def __init__(self, name: str):
self.name = name
def log(self, args):
log(f"{self.name}: {args}")
def start(self):
with self.lock:
self.count.incr()
if self.count.count == 1:
self.value = uuid.uuid1()
self.log(f"starting {self.value}")
self.log(f"ref up {self.count}")
def stop(self):
with self.lock:
self.count.decr()
if self.count.count == 0:
self.log(f"stopping {self.value}")
self.log(f"ref down {self.count}")
def thread(i):
# references service with specific name
myobj = Myobj(f"name{i % 2}")
# only the first thread that gets here should start the service with that name
myobj.start()
# some_computation()
time.sleep(random.uniform(0, 2))
# only the last thread that gets here should stop the service with that name
myobj.stop()
size = 6
with multiprocessing.pool.ThreadPool(size) as tp:
tp.map(thread, range(size))
6091517952: name0: starting 7fe4e7fc-2ec5-11ee-8745-9d96bc9d9be2
6091517952: name0: ref up Counter(1)
6092664832: name1: ref up Counter(2)
6093238272: name0: ref up Counter(3)
6094385152: name1: ref up Counter(4)
6094958592: name0: ref up Counter(5)
6095532032: name1: ref up Counter(6)
6094385152: name1: ref down Counter(5)
6093238272: name0: ref down Counter(4)
6095532032: name1: ref down Counter(3)
6092664832: name1: ref down Counter(2)
6094958592: name0: ref down Counter(1)
6091517952: name0: stopping 7fe4e7fc-2ec5-11ee-8745-9d96bc9d9be2
6091517952: name0: ref down Counter(0)