所以这个看似简单的问题困扰了我。
我有一个数据集(datas
),我对其进行了一些处理(这不是问题,尽管由于数据集的大小,这会花费一些时间)以产生多行要存储到CSV文件中。但是,产生一行,然后将其保存到csv,然后产生一行,然后保存,这是非常麻烦的。
因此,我正在尝试实现生产者和使用者线程-生产者将产生每行数据(以加快处理速度),存储在队列中,然后一个使用者将附加到我的csv文件中。
我在下面的尝试有时会成功(正确保存数据),或者有时数据会被“切断”(整个行或其中的一部分)。
我在做什么错?
from threading import Thread
from queue import Queue
import csv
q = Queue()
def producer():
datas = [["hello","world"],["test","hey"],["my","away"],["your","gone"],["bye","hat"]]
for data in datas:
q.put(data)
def consumer():
while True:
local = q.get()
file = open('dataset.csv','a')
with file as fd:
writer = csv.writer(fd)
writer.writerow(local)
file.close()
q.task_done()
for i in range(10):
t = Thread(target=consumer)
t.daemon = True
t.start()
producer()
q.join()
我认为这与您要尝试执行的操作类似。出于测试目的,它在结果CSV文件中的每行数据前面都添加了“生产者ID”,以便可以在结果中看到数据的来源。
import csv
from queue import Queue
from threading import Thread
NUM_PRODUCERS = 10
SENTINEL = None
def producer(id):
data = (("hello", "world"), ("test", "hey"), ("my", "away"), ("your", "gone"),
("bye", "hat"))
for datum in data:
q.put((id,) + datum) # Prefix producer ID to datum for testing.
class Consumer(Thread):
def run(self):
with open('dataset.csv', 'w', newline='') as file:
writer = csv.writer(file, delimiter=',')
while True:
datum = q.get()
if datum is SENTINEL:
break
writer.writerow(datum)
q.task_done()
q = Queue()
# Create and start producer threads.
for id in range(NUM_PRODUCERS):
t = Thread(target=producer, args=(id+1,))
t.start()
# Create and start Consumer thread.
consumer = Consumer()
consumer.start()
# Wait until all items put in queue by producer threads have been processed by
# consumer.
q.join()
q.put(SENTINEL) # Tell consumer thread it can quit.
consumer.join()
print('Done')