我的制作人是本地的 sqlitedict(迭代速度非常快)。我的消费者正在获取网页。我需要限制并发网页抓取的数量。所以我想出了:
from scraping import xpath_soup, get_page_content, links_from_soup
import asyncio
from sqlitedict import SqliteDict
from bs4 import BeautifulSoup
DB_PUBS = SqliteDict("data/publishers.sqlite")
PUB_BATCH_SIZE = 10
async def process_publisher(publisher, semaphore):
# Scrape all the links from the publisher URL
page_src = await get_page_content(publisher['url'])
if page_src is not None:
soup = BeautifulSoup(page_src, 'html.parser')
page_links = links_from_soup(soup, publisher['url'])
print(f"Found {len(page_links)} links for publisher {publisher['url']}")
semaphore.release()
async def process_publisher_queue(publisher_queue, semaphore):
while True:
publisher = await publisher_queue.get()
# spawn a task to process a publisher from the queue
asyncio.create_task(process_publisher(publisher, semaphore))
async def main():
# Get a batch of publishers
publisher_queue = asyncio.Queue(maxsize=PUB_BATCH_SIZE)
# create a semaphore to limit the number of queue items being processed at a time
semaphore = asyncio.Semaphore(PUB_BATCH_SIZE)
# spawn a task to process the publisher queue
asyncio.create_task(process_publisher_queue(publisher_queue, semaphore))
for pub_url, publisher in DB_PUBS.items():
await semaphore.acquire()
print(f"Adding publisher {pub_url} to queue")
await publisher_queue.put(publisher)
if __name__ == '__main__':
asyncio.run(main())
但后来我心想“我为什么还要排队?”所以我改成了:
from scraping import xpath_soup, get_page_content, links_from_soup
import asyncio
from sqlitedict import SqliteDict
from bs4 import BeautifulSoup
DB_PUBS = SqliteDict("data/publishers.sqlite")
PUB_BATCH_SIZE = 10
async def process_publisher(publisher, semaphore):
# Scrape all the links from the publisher URL
page_src = await get_page_content(publisher['url'])
if page_src is not None:
soup = BeautifulSoup(page_src, 'html.parser')
page_links = links_from_soup(soup, publisher['url'])
print(f"Found {len(page_links)} links for publisher {publisher['url']}")
semaphore.release()
async def main():
# Get a batch of publishers
# create a semaphore to limit the number of queue items being processed at a time
semaphore = asyncio.Semaphore(PUB_BATCH_SIZE)
for pub_url, publisher in DB_PUBS.items():
await semaphore.acquire()
print(f"Adding publisher {pub_url} to queue")
asyncio.create_task(process_publisher(publisher, semaphore))
if __name__ == '__main__':
asyncio.run(main())
然后我问 ChatGPT 这是否是个好主意,她说:
虽然技术上可以移除队列并直接生成 每个发布者的任务,它可能不是最有效的方法 处理发布者。使用异步队列有一些好处,例如 允许您限制正在处理的项目的最大数量 一次提供缓冲,以防生产者和消费者 有不同的速度。
通过使用队列,您可以将新的发布者添加到队列中,而无需 担心它们的处理速度有多快,因为它们将会 添加到队列中并以先进先出的顺序处理。如果你 直接为每个发布者生成任务,存在风险 任务太多,消耗太多,系统不堪重负 记忆。
另外,通过使用队列,可以方便的修改程序以 使用多个消费者任务来处理队列。这可以帮助 通过允许多个任务在队列上工作来提高性能 同时,可以减少整体处理时间。
总的来说,虽然删除队列在技术上是可行的,但它可能 不是处理发布者的最有效方式。使用队列 提供更多的控制和灵活性,这在许多情况下都很有用 场景。
但是信号量已经限制了一次处理的最大项目数。我的想法是,如果我有第二个可等待对象,我需要在获取页面后处理该页面,也许我会使用队列使用者。这个脚本无疑会变大。我没有忽略这里的任何东西,是吗?
如果你想浪费最少的时间,最好的方法是使用单个信号量,并且只在检索阶段使用该信号量。处理阶段可以分开,不需要被信号量阻塞。
另一件需要注意的事情是,推荐的信号量使用上下文管理器(
async with
),而不是acquire()
和release()
。
async def gather(url):
# Make sure to use sessions here to take
# advantage of HTTP Keep-Alive connections.
...
async def process(payload):
...
async def task(publisher, semaphore):
# Gather the publisher information.
# Ensure you are only doing it N at the same time.
url = publisher['url']
with semaphore:
payload = await gather(url)
processed_payload = process(payload)
# Do something with newly processed payload here.
async def main():
N = 10
sem = asyncio.semaphore(N)
tasks = [task(publisher, sem) for publisher in DB_PUBS.values()]
await asyncio.gather(*tasks)
请注意,即使在使用信号量时,您仍然可能有很多请求同时发送到服务器,特别是
N=10
.