我想从网站下载并处理大量文件。该站点的服务条款限制了您每秒允许下载的文件数。
处理文件所需的时间实际上是瓶颈,所以我希望能够并行处理多个文件。但我不希望不同的进程结合起来违反下载限制。所以我需要一些限制过度请求率的东西。我正在考虑以下内容,但我并不是multiprocessing
模块的专家。
import multiprocessing
from multiprocessing.managers import BaseManager
import time
class DownloadLimiter(object):
def __init__(self, time):
self.time = time
self.lock = multiprocessing.Lock()
def get(self, url):
self.lock.acquire()
time.sleep(self.time)
self.lock.release()
return url
class DownloadManager(BaseManager):
pass
DownloadManager.register('downloader', DownloadLimiter)
class Worker(multiprocessing.Process):
def __init__(self, downloader, queue, file_name):
super().__init__()
self.downloader = downloader
self.file_name = file_name
self.queue = queue
def run(self):
while not self.queue.empty():
url = self.queue.get()
content = self.downloader.get(url)
with open(self.file_name, "a+") as fh:
fh.write(str(content) + "\n")
然后在其他地方运行下载
manager = DownloadManager()
manager.start()
downloader = manager.downloader(0.5)
queue = multiprocessing.Queue()
urls = range(50)
for url in urls:
queue.put(url)
job1 = Worker(downloader, queue, r"foo.txt")
job2 = Worker(downloader, queue, r"bar.txt")
jobs = [job1, job2]
for job in jobs:
job.start()
for job in jobs:
job.join()
这似乎是小规模的工作,但我有点担心锁定是否真的正确完成。
此外,如果有更好的模式来实现相同的目标,我很乐意听到它。
最简单的方法是在主线程上下载并将文档提供给工作池。
在我自己的实现中,我已经走了使用芹菜来处理文档和使用gevent进行下载的途径。只有更复杂的情况才能做同样的事情。
这是一个简单的例子。
import multiprocessing
from multiprocessing import Pool
import time
import typing
def work(doc: str) -> str:
# do some processing here....
return doc + " processed"
def download(url: str) -> str:
return url # a hack for demo, use e.g. `requests.get()`
def run_pipeline(
urls: typing.List[str],
session_request_limit: int = 10,
session_length: int = 60,
) -> None:
"""
Download and process each url in `urls` at a max. rate limit
given by `session_request_limit / session_length`
"""
workers = Pool(multiprocessing.cpu_count())
results = []
n_requests = 0
session_start = time.time()
for url in urls:
doc = download(url)
results.append(
workers.apply_async(work, (doc,))
)
n_requests += 1
if n_requests >= session_request_limit:
time_to_next_session = session_length - time.time() - session_start
time.sleep(time_to_next_session)
if time.time() - session_start >= session_length:
session_start = time.time()
n_requests = 0
# Collect results
for result in results:
print(result.get())
if __name__ == "__main__":
urls = ["www.google.com", "www.stackoverflow.com"]
run_pipeline(urls)
这可以使用Ray干净地完成,ray.init(num_cpus=4, resources={'Network': 2})
是一个用于并行和分布式Python的库。
Ray的资源
当您启动Ray时,您可以告诉它该计算机上有哪些可用资源。 Ray将自动尝试确定CPU核心数量和GPU数量,但这些可以指定,实际上也可以传入任意用户定义的资源,例如通过调用
Network
这告诉Ray该机器有4个CPU核心和2个用户定义的资源,名为@ray.remote(resources={'Network': 1})
def f():
pass
。
每个Ray“任务”是一个可调度的工作单元,具有一定的资源要求。默认情况下,任务将需要1个CPU内核而不需要其他内容。但是,可以通过声明相应的函数来指定任意资源要求
f
这告诉Ray,为了让Network
在“worker”进程上执行,必须有1个CPU核心(默认值)和1个Network
资源。
由于机器有2个f
资源和4个CPU核心,因此最多可以同时执行2个g
副本。另一方面,如果有另一个函数@ray.remote
def g():
pass
声明
g
然后可以同时执行四份f
,或者同时执行两份g
和两份import ray
import time
max_concurrent_downloads = 2
ray.init(num_cpus=4, resources={'Network': max_concurrent_downloads})
@ray.remote(resources={'Network': 1})
def download_content(url):
# Download the file.
time.sleep(1)
return 'result from ' + url
@ray.remote
def process_result(result):
# Process the result.
time.sleep(1)
return 'processed ' + result
urls = ['url1', 'url2', 'url3', 'url4']
result_ids = [download_content.remote(url) for url in urls]
processed_ids = [process_result.remote(result_id) for result_id in result_ids]
# Wait until the tasks have finished and retrieve the results.
processed_results = ray.get(processed_ids)
。
例
这是一个示例,其中包含用于下载内容和处理内容的实际函数的占位符。
ray timeline
这是一个时间线描述(您可以通过从命令行运行download_content
并在chrome://在Chrome Web浏览器中跟踪打开生成的JSON文件来生成)。
在上面的脚本中,我们提交了4个Network
任务。这些是我们通过指定它们需要process_result
资源(除了默认的1 CPU资源)来限制速率的那些。然后我们提交4个download_content
任务,每个任务都需要默认的1个CPU资源。任务分三个阶段执行(只需看蓝框)。
process_result
任务,这个任务可以同时执行(因为速率限制)。我们还不能执行任何download_content
任务,因为它们依赖于download_content
任务的输出。process_result
任务以及两个process_result
任务,因为我们不限制process_result
任务。每个“行”是一个工作进程。时间从左到右。
您可以在ratelimit
上查看更多有关如何执行此操作的信息。
有一个完全符合您需求的库,名为from ratelimit import limits
import requests
FIFTEEN_MINUTES = 900
@limits(calls=15, period=FIFTEEN_MINUTES)
def call_api(url):
response = requests.get(url)
if response.status_code != 200:
raise Exception('API response: {}'.format(response.status_code))
return response
他们主页的例子:
此功能将无法在15分钟的时间内拨打超过15个API。
e
顺便说一句,在I / O密集型任务(例如Web爬网)中,您可以使用多线程,而不是多处理。在使用多处理时,您必须创建另一个控制流程,并协调您执行的所有操作。在多线程方法的情况下,所有线程本身都可以访问主进程内存,因此信令变得更容易(因为线程之间共享import logging
import threading
import time
logging.basicConfig(level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
logging.debug('wait_for_event starting')
event_is_set = e.wait()
logging.debug('event set: %s', event_is_set)
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
while not e.isSet():
logging.debug('wait_for_event_timeout starting')
event_is_set = e.wait(t)
logging.debug('event set: %s', event_is_set)
if event_is_set:
logging.debug('processing event')
else:
logging.debug('doing other work')
e = threading.Event()
t1 = threading.Thread(name='block',
target=wait_for_event,
args=(e,))
t1.start()
t2 = threading.Thread(name='non-block',
target=wait_for_event_timeout,
args=(e, 2))
t2.start()
logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')
):
#!/usr/bin/env python3
import os
import time
import random
from functools import partial
from multiprocessing import Pool, Manager
CPU_NUM = 4
CONCURRENT_DOWNLOADS = 2
def download(url, semaphore):
pid = os.getpid()
with semaphore:
print('Process {p} is downloading from {u}'.format(p=pid, u=url))
time.sleep(random.randint(1, 5))
# Process the obtained resource:
time.sleep(random.randint(1, 5))
return 'Successfully processed {}'.format(url)
def main():
manager = Manager()
semaphore = manager.Semaphore(CONCURRENT_DOWNLOADS)
target = partial(download, semaphore=semaphore)
urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]
with Pool(processes=CPU_NUM) as pool:
results = pool.map(target, urls)
print(results)
if __name__ == '__main__':
main()
在“限速下载”下你的意思并不是很清楚。在这种情况下,它是一些并发下载,这是一个常见的用例,我认为简单的解决方案是使用信号量与进程池:
CONCURRENT_DONWLOADS
正如您所看到的,一次只下载#!/usr/bin/env python3
import os
import time
import random
from threading import Thread
from multiprocessing import Process, JoinableQueue
WORKERS = 4
DOWNLOADS_PER_SECOND = 2
def download_resource(url, resource_queue):
pid = os.getpid()
t = time.strftime('%H:%M:%S')
print('Thread {p} is downloading from {u} ({t})'.format(p=pid, u=url, t=t),
flush=True)
time.sleep(random.randint(1, 10))
results = '[resource {}]'.format(url)
resource_queue.put(results)
def process_resource(resource_queue):
pid = os.getpid()
while True:
res = resource_queue.get()
print('Process {p} is processing {r}'.format(p=pid, r=res),
flush=True)
time.sleep(random.randint(1, 10))
resource_queue.task_done()
def main():
resource_queue = JoinableQueue()
# Start process workers:
for _ in range(WORKERS):
worker = Process(target=process_resource,
args=(resource_queue,),
daemon=True)
worker.start()
urls = ['https://link/to/resource/{i}'.format(i=i) for i in range(10)]
while urls:
target_urls = urls[:DOWNLOADS_PER_SECOND]
urls = urls[DOWNLOADS_PER_SECOND:]
# Start downloader threads:
for url in target_urls:
downloader = Thread(target=download_resource,
args=(url, resource_queue),
daemon=True)
downloader.start()
time.sleep(1)
resource_queue.join()
if __name__ == '__main__':
main()
进程,而其他进程正忙于处理获得的资源。
好的,经过OP的以下澄清
通过“每秒下载次数”,我的意思是全球每秒只有下载次数。
我决定发布另一个答案,因为我认为我的第一个答案对于那些希望限制多个并发运行进程的人来说也是有意义的。
我认为没有必要使用其他框架来解决这个问题。我们的想法是使用为每个资源链接生成的下载线程,资源队列和固定数量的处理工作程序,它们是进程,而不是线程:
$ ./limit_download_rate.py
Thread 32482 is downloading from https://link/to/resource/0 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/1 (10:14:08)
Thread 32482 is downloading from https://link/to/resource/2 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/3 (10:14:09)
Thread 32482 is downloading from https://link/to/resource/4 (10:14:10)
Thread 32482 is downloading from https://link/to/resource/5 (10:14:10)
Process 32483 is processing [resource https://link/to/resource/2]
Process 32484 is processing [resource https://link/to/resource/0]
Thread 32482 is downloading from https://link/to/resource/6 (10:14:11)
Thread 32482 is downloading from https://link/to/resource/7 (10:14:11)
Process 32485 is processing [resource https://link/to/resource/1]
Process 32486 is processing [resource https://link/to/resource/3]
Thread 32482 is downloading from https://link/to/resource/8 (10:14:12)
Thread 32482 is downloading from https://link/to/resource/9 (10:14:12)
Process 32484 is processing [resource https://link/to/resource/6]
Process 32485 is processing [resource https://link/to/resource/9]
Process 32483 is processing [resource https://link/to/resource/8]
Process 32486 is processing [resource https://link/to/resource/4]
Process 32485 is processing [resource https://link/to/resource/7]
Process 32483 is processing [resource https://link/to/resource/5]
结果看起来像这样:
DOWNLOADS_PER_SECOND
这里,每个第二个WORKERS
线程都在启动,在本例中为两个,然后下载并将资源放入队列。 qazxswpoi是一些从队列中获取资源以进行进一步处理的进程。通过此设置,您将能够限制每秒启动的下载次数,并让工作人员并行处理获得的资源。