我想在 Celery(版本5.3.4)任务中运行 Scrapy 蜘蛛(版本2.10)。此外,我希望我的蜘蛛能够使用多重处理来工作。 我定义了一个 CrawlerProcess 类,如下:
from scrapy.utils.project import get_project_settings
from scrapy.crawler import Crawler
from multiprocessing import Process
from twisted.internet import reactor
from scrapy import signals, crawler
class CrawlerProcess(Process):
def __init__(self, spider_cls, args):
Process.__init__(self)
settings = get_project_settings()
self.crawler = Crawler(spider_cls, settings)
self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed)
self.args = args
在外部,我定义了一个 Celery 任务,如下:
from aifc import Error
from kombu import Queue
from scrapy import crawler
from twisted.internet import reactor
from celery import Celery
from celery import states
import time
app = Celery("run")
app.config_from_object("scrapyselenium.celery_config")
app.conf.task_queues = (Queue(name='default'))
@app.task(bind=True, name="defaultTask", queue="default")
def scraper(self, args):
self.update_state(state=states.STARTED)
crawler = None
try:
crawler = CrawlerProcess(SiteMonitorSpider, args)
self.update_state(state=states.PENDING)
crawler.start()
crawler.join()
self.update_state(state=states.SUCCESS)
return args
except SoftTimeLimitExceeded:
self.update_state(state=states.REVOKED)
if crawler is not None:
crawler.terminate()
crawler.close()
但是,当我启动 Celery 任务时,出现以下错误:“Can't pickle local object 'Crawler.init.
您能告诉我如何解决这个问题吗?谢谢你。
Celery 序列化函数
args
和 kwargs
,同时通过代理发送它们。因此,它期望每个 arg/kwarg
都是 pickle
或 json
可序列化,具体取决于您使用的序列化程序。默认情况下,最新版本中使用 json
。尝试将序列化器更改为 pickle
,如 follows 并查看它是否有效。如果没有,您可以通过代理传递简单的可序列化对象。