使用多重处理和线程并行处理非常大的文本文件

问题描述 投票:0回答:1

我发现了其他几个与此主题相关的问题,但没有一个与我的情况非常相似。

我有几个非常大的文本文件(大小超过3 GB)。

我想使用multiprocessing并行处理它们(例如2个文档)。作为处理的一部分(在单个进程内),我需要进行API调用,因此,希望每个进程都有自己的threads以异步运行。

我想出了一个简化的示例(我已经注释了代码以尝试解释我认为它[的行为):

import multiprocessing from threading import Thread import threading from queue import Queue import time def process_huge_file(*, file_, batch_size=250, num_threads=4): # create APICaller instance for each process that has it's own Queue api_call = APICaller() batch = [] # create threads that will run asynchronously to make API calls # I expect these to immediately block since there is nothing in the Queue (which is was # the api_call.run depends on to make a call threads = [] for i in range(num_threads): thread = Thread(target=api_call.run) threads.append(thread) thread.start() for thread in threads: thread.join() #### # start processing the file line by line for line in file_: # if we are at our batch size, add the batch to the api_call to to let the threads do # their api calling if i % batch_size == 0: api_call.queue.put(batch) else: # add fake line to batch batch.append(fake_line) class APICaller: def __init__(self): # thread safe queue to feed the threads which point at instances of these APICaller objects self.queue = Queue() def run(self): print("waiting for something to do") self.queue.get() print("processing item in queue") time.sleep(0.1) print("finished processing item in queue") if __name__ == "__main__": # fake docs fake_line = "this is a fake line of some text" # two fake docs with line length == 1000 fake_docs = [[fake_line] * 1000 for i in range(2)] #### num_processes = 2 procs = [] for idx, doc in enumerate(fake_docs): proc = multiprocessing.Process(target=process_huge_file, kwargs=dict(file_=doc)) proc.start() procs.append(proc) for proc in procs: proc.join()
由于现在的代码,“等待某事”打印8次(每个进程有意义4个线程),然后停止或“死锁”,这不是我期望的-我希望它开始与一旦我开始将项目放入Queue中,线程就会立即线程化,但是代码似乎还没有达到目的。通常,我会逐步找到挂断电话,但是对于如何使用Threads(另一天的另一主题)进行最佳调试,我仍然没有足够的了解。 

同时,有人可以帮助我弄清楚为什么我的代码没有执行应做的事情吗?

python multithreading multiprocessing python-multiprocessing python-multithreading
1个回答
0
投票
我进行了一些调整和补充,并且代码似乎可以完成现在应该做的事情。主要调整是:添加一个CloseableQueue类(来自Brett Slatkins Effective Python Item 55),并确保我调用close并加入队列,以使线程正确退出。以下是这些更改的完整代码:

import multiprocessing from threading import Thread import threading from queue import Queue import time from concurrency_utils import CloseableQueue def sync_process_huge_file(*, file_, batch_size=250): batch = [] for idx, line in enumerate(file_): # do processing on the text if idx % batch_size == 0: time.sleep(0.1) batch = [] # api_call.queue.put(batch) else: computation = 0 for i in range(100000): computation += i batch.append(line) def process_huge_file(*, file_, batch_size=250, num_threads=4): api_call = APICaller() batch = [] # api call threads threads = [] for i in range(num_threads): thread = Thread(target=api_call.run) threads.append(thread) thread.start() for idx, line in enumerate(file_): # do processing on the text if idx % batch_size == 0: api_call.queue.put(batch) else: computation = 0 for i in range(100000): computation += i batch.append(line) for _ in threads: api_call.queue.close() api_call.queue.join() for thread in threads: thread.join() class APICaller: def __init__(self): self.queue = CloseableQueue() def run(self): for item in self.queue: print("waiting for something to do") pass print("processing item in queue") time.sleep(0.1) print("finished processing item in queue") print("exiting run") if __name__ == "__main__": # fake docs fake_line = "this is a fake line of some text" # two fake docs with line length == 1000 fake_docs = [[fake_line] * 10000 for i in range(2)] #### time_s = time.time() num_processes = 2 procs = [] for idx, doc in enumerate(fake_docs): proc = multiprocessing.Process(target=process_huge_file, kwargs=dict(file_=doc)) proc.start() procs.append(proc) for proc in procs: proc.join() time_e = time.time() print(f"took {time_e-time_s} ") class CloseableQueue(Queue): SENTINEL = object() def __init__(self, **kwargs): super().__init__(**kwargs) def close(self): self.put(self.SENTINEL) def __iter__(self): while True: item = self.get() try: if item is self.SENTINEL: return # exit thread yield item finally: self.task_done()

与预期的相比,这是从同步运行-120秒对50秒的巨大提速。     
© www.soinside.com 2019 - 2024. All rights reserved.