假设您想要广播生成器生成的数据。有很多消费者接收数据并消费它。生成器将生成有关是否有消费者的数据。每个加入的消费者从加入的那一刻起就可以收到数据。最好的解决方案是使用观察者模式,但是如果消费者的数量如此之多并且产生信息的速率也如此之高怎么办?在这种情况下,当数据 1 可用并且您正在向所有观察者广播数据时,数据 2 已准备就绪,并且您尚未完成数据 1 的广播。我在想这个问题有没有办法解决?我想到的一件事是:是否有可能某个线程在共享空间中执行相同的函数?换句话说,是否可以运行一次函数,然后所有线程一起获得结果。
看这段代码:
class data_provider():
def __init__(self) -> None:
self.cnt = 0
self.stop = False
self.data_received = Event()
self.data_received.clear()
data_generator_thread = threading.Thread(target= self.data_generator)
data_generator_thread.start()
def data_sender(self):
self.data_received.wait()
while not self.stop:
yield { 'Name': 'Data', 'Count': self.cnt}
self.data_received.clear()
self.data_received.wait()
def data_generator(self):
while(self.cnt < 20):
self.cnt += 1
self.data_received.set()
sleep(1)
self.stop = True
self.data_received.set()
和main.py
obj = data_provider()
def printing(id):
for data in obj.data_sender():
print(str(id) + ' : ' + str(data))
our_threads = []
for i in range(100):
our_threads.append(Thread(target= printing, args=(i,)))
our_threads[i].start()
for i in range(100):
our_threads[i].join()
print('End')
如果所有线程都运行 data_sender() 一次,并且一旦其他线程收到信息,所有线程都会得到相同的结果。
有什么想法吗?
如果您希望拥有某些数据的单个生产者和该数据的多个“消费者”,则以下代码使用带有通知的
Condition
实例。出于演示目的,生产者仅生成 5 条数据,而我们有 3 个消费者。这将使输出保持合理的长度:
使用链表更新
import threading
class Node:
def __init__(self, data, previous_node):
self.next = None
self.data = data
if previous_node:
previous_node.next = self
def __repr__(self):
next = f'({repr(self.next)})' if self.next else None
return f'data: {repr(self.data)}, next: {next}'
class data_provider():
def __init__(self) -> None:
self.condition = threading.Condition()
self.running = True
# To simplify the code, the first node in the list is a dummy:
self.linked_list = Node(None, None)
data_generator_thread = threading.Thread(target=self.data_generator)
data_generator_thread.start()
def data_generator(self):
import time
last_node = self.linked_list
for cnt in range(1, 6) : # Reduced count for demo purposes
last_node = Node({'Name': 'Data', 'Count': cnt}, last_node)
with self.condition:
self.condition.notify_all()
print('Done producing')
# Let consumers know that no more data will be coming:
with self.condition:
self.running = False
self.condition.notify_all()
N_PRINTERS = 3 # The number of printer threads:
obj = data_provider()
def printing(id):
last_node = obj.linked_list
while True:
with obj.condition:
obj.condition.wait_for(
lambda: not obj.running or last_node.next
)
if not last_node.next:
return
last_node = last_node.next
print(id, ':', last_node.data)
while last_node.next:
last_node = last_node.next
print(id, ':', last_node.data)
printer_threads = []
for i in range(N_PRINTERS):
thread = threading.Thread(target=printing, args=(i,))
thread.start()
printer_threads.append(thread)
for thread in printer_threads:
thread.join()
print('End')
打印:
Done producing
0 : {'Name': 'Data', 'Count': 1}
0 : {'Name': 'Data', 'Count': 2}
1 : {'Name': 'Data', 'Count': 1}
0 : {'Name': 'Data', 'Count': 3}
1 : {'Name': 'Data', 'Count': 2}
0 : {'Name': 'Data', 'Count': 4}
1 : {'Name': 'Data', 'Count': 3}
0 : {'Name': 'Data', 'Count': 5}
1 : {'Name': 'Data', 'Count': 4}
2 : {'Name': 'Data', 'Count': 1}
1 : {'Name': 'Data', 'Count': 5}
2 : {'Name': 'Data', 'Count': 2}
2 : {'Name': 'Data', 'Count': 3}
2 : {'Name': 'Data', 'Count': 4}
2 : {'Name': 'Data', 'Count': 5}
End