我目前正在编写一个小应用程序,它从 UDP 套接字获取 dataArray,然后 3 个不同的线程应该检查此 dataArray 中是否有属于它的项目(所有数据块都有 ID,如果 ID 匹配,则它属于该线程)。 3 个侧线程使用 while True 循环,因此一旦有新数据到达,它们总是会立即处理数据。
def mlFunc(inp):
while True:
temp =
if temp:
canDo.publish(temp)
这是侧线程使用的函数。 temp 应该是 canDo 需要的数据
def consumer(in_q):
while True:
canAr = canParser.b2can(in_q.get())
这是从 UDP 脚本获取数据的代码
我尝试使用全局变量,但是当我在副线程中使用该变量时,主线程就会冻结。 当我使用队列时,一旦一个线程获取数据,它就消失了。
如果有任何更好的方法来告诉这些线程发生了更新,那就更受欢迎了。 数据处理需要实时。
如果有3个消费者,则为每个消费者创建一个队列,并将所有消费者传递给发布者,并让发布者将消息推送给所有消费者。
import threading
import time
from queue import Queue
def publisher(consumers):
for x in range(10):
value = 2 ** x
for consumer in consumers:
consumer.put(value)
time.sleep(0.1)
for consumer in consumers:
consumer.put(None) # sentinel value to indicate end of stream
def consumer(name, queue):
while True:
value = queue.get()
if value is None:
print(f"{name} will quit now")
break
print(f"{name}: Got {value}")
def main():
consumer_threads = []
consumer_queues = []
for x in range(3):
queue = Queue()
consumer_queues.append(queue)
thread = threading.Thread(target=consumer, args=(f"Consumer {x}", queue))
thread.start()
consumer_threads.append(thread)
publisher_thread = threading.Thread(target=publisher, args=(consumer_queues,))
publisher_thread.start()
publisher_thread.join()
for thread in consumer_threads:
thread.join()
main()