在 Celery 任务中运行 Scrapy 蜘蛛

问题描述 投票:0回答:5

这不再起作用了,scrapy 的 API 已更改。

现在文档提供了一种“从脚本运行 Scrapy”的方法,但我收到了

ReactorNotRestartable
错误。

我的任务:

from celery import Task

from twisted.internet import reactor

from scrapy.crawler import Crawler
from scrapy import log, signals
from scrapy.utils.project import get_project_settings

from .spiders import MySpider



class MyTask(Task):
    def run(self, *args, **kwargs):
        spider = MySpider
        settings = get_project_settings()
        crawler = Crawler(settings)
        crawler.signals.connect(reactor.stop, signal=signals.spider_closed)
        crawler.configure()
        crawler.crawl(spider)
        crawler.start()

        log.start()
        reactor.run()
scrapy twisted celery
5个回答
42
投票

扭曲的反应堆无法重新启动。解决这个问题的方法是让 celery 任务为您想要执行的每个爬网创建一个新的子进程,如下面的文章中所建议的:

这通过利用

multiprocessing
包解决了“反应堆无法重新启动”的问题。但问题是,最新的 celery 版本的解决方法现在已经过时了,因为您将遇到另一个问题,即守护进程无法生成子进程。因此,为了使解决方法发挥作用,您需要使用 celery 版本。

是的,并且

scrapy
API 已更改。但稍作修改(
import Crawler
而不是
CrawlerProcess
)。您可以通过使用 celery 版本来解决此问题。

Celery 问题可以在这里找到: 芹菜问题#1709

这是我的更新的爬行脚本,它可以通过使用billiard

而不是
multiprocessing
来与较新的芹菜版本一起使用:

from scrapy.crawler import Crawler from scrapy.conf import settings from myspider import MySpider from scrapy import log, project from twisted.internet import reactor from billiard import Process from scrapy.utils.project import get_project_settings from scrapy import signals class UrlCrawlerScript(Process): def __init__(self, spider): Process.__init__(self) settings = get_project_settings() self.crawler = Crawler(settings) self.crawler.configure() self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed) self.spider = spider def run(self): self.crawler.crawl(self.spider) self.crawler.start() reactor.run() def run_spider(url): spider = MySpider(url) crawler = UrlCrawlerScript(spider) crawler.start() crawler.join()

编辑:通过阅读芹菜问题#1709,他们建议使用台球而不是多处理,以解除子进程限制。换句话说,我们应该尝试台球,看看它是否有效!

编辑2:是的,通过使用billiard,我的脚本可以与最新的芹菜版本一起使用!请参阅我更新的脚本。


15
投票
Twisted 反应堆无法重新启动,因此一旦一个蜘蛛完成运行并且

crawler

 隐式停止反应堆,该工作人员就毫无用处。

正如另一个问题的答案中所发布的,您所需要做的就是杀死运行蜘蛛的工作人员并用新的工作人员替换它,这可以防止反应堆多次启动和停止。为此,只需设置:

CELERYD_MAX_TASKS_PER_CHILD = 1

缺点是你并没有真正

使用Twisted反应器的全部潜力,并且浪费了运行多个反应器的资源,因为一个反应器可以在一个进程中同时运行多个蜘蛛。更好的方法是每个工作人员运行一个反应器(甚至全局一个反应器),并且不要让crawler

碰它。

我正在为一个非常相似的项目做这件事,所以如果我取得任何进展,我会更新这篇文章。


2
投票
为了避免在 Celery 任务队列中运行 Scrapy 时出现 ReactorNotRestartable 错误,我使用了线程。使用相同的方法在一个应用程序中多次运行 Twisted Reactor。 Scrapy也使用了Twisted,所以我们也可以这样做。

这是代码:

from threading import Thread from scrapy.crawler import CrawlerProcess import scrapy class MySpider(scrapy.Spider): name = 'my_spider' class MyCrawler: spider_settings = {} def run_crawler(self): process = CrawlerProcess(self.spider_settings) process.crawl(MySpider) Thread(target=process.start).start()

不要忘记增加 celery 的 CELERYD_CONCURRENCY 。

CELERYD_CONCURRENCY = 10

对我来说效果很好。

这不会阻塞进程运行,但无论如何 scrapy 的最佳实践是在回调中处理数据。只需这样做:

for crawler in process.crawlers: crawler.spider.save_result_callback = some_callback crawler.spider.save_result_callback_params = some_callback_params Thread(target=process.start).start()
    

0
投票
这对我有用,受到

这个答案的启发:

from scrapy.settings import Settings from scraper.scraper import settings as scraper_settings from celery import signals, Celery import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings") app = Celery("enrichment") @signals.worker_process_init.connect def configure_infrastructure(**kwargs): from twisted.internet import asyncioreactor asyncioreactor.install() # TWISTED_REACTOR setting in scraper/scraper/settings.py from crochet import setup setup() @app.task() def do_scraping(): crawler_settings = Settings() crawler_settings.setmodule(scraper_settings) runner = CrawlerRunner(settings=crawler_settings) runner.crawl("spider_name", url="some_url")
    

-2
投票
我想说,如果您有很多任务需要处理,这种方法效率非常低。 因为 Celery 是线程化的 - 在它自己的线程中运行每个任务。 假设使用 RabbitMQ 作为代理,您可以传递 >10K q/s。 对于 Celery,这可能会导致 10K 线程开销! 我建议不要在这里使用芹菜。而是直接访问经纪人!

© www.soinside.com 2019 - 2024. All rights reserved.