Apscheduler 正在多次执行作业

问题描述 投票:0回答:3

我有一个使用 uwsgi(有 10 个工作进程)+ ngnix 运行的 django 应用程序。我正在使用 apscheduler 进行调度。每当我安排一项作业时,它就会被执行多次。从这些答案ans1ans2我知道这是因为调度程序是在uwsgi的每个worker中启动的。我按照此answer中的建议通过将调度程序绑定到套接字并通过在数据库中保留状态来对调度程序进行条件初始化,以便仅启动一个调度程序实例,但仍然存在相同的问题,有时也存在创建作业时,发现调度程序未运行,并且作业保持挂起状态且未执行。

我正在使用以下代码在 django 应用程序的 url 中初始化 apscheduler。这将在应用程序启动时启动调度程序。

def job_listener(ev):
    print('event',ev)


job_defaults = {
    'coalesce': True,  
    'max_instances': 1
}

scheduler = BackgroundScheduler(job_defaults=job_defaults, timezone=TIME_ZONE, daemon=False)
scheduler.add_jobstore(MongoDBJobStore(client=client), 'default')
scheduler.add_executor(ThreadPoolExecutor(), 'default')
scheduler.add_executor(ProcessPoolExecutor(),'processpool')
scheduler.add_listener(job_listener)


def initialize_scheduler():
    try:
        if scheduler_db_conn.find_one():
            print('scheduler already running')
            return True
        scheduler.start()
        scheduler_db_conn.save({'status': True})
        print('---------------scheduler started --------------->')
        return True
    except:
        return False

我使用以下代码来创建作业。

from scheduler_conf import scheduler
def create_job(arg_list):
    try:
        print('scheduler status-->',scheduler.running)
        job = scheduler.add_job(**arg_list)
        return True
    except:
        print('error in creating Job')
        return False

我无法正确配置和运行调度程序。我已经参考了 apschedule 中的所有线程,但仍然没有找到解决方案。

  • 如果我不限制每个工作线程中运行多个调度程序,则作业会执行多次。
  • 但是,如果我限制在一个工作进程中仅运行一个调度程序,则某些作业将保持挂起状态而不执行。

这个问题有什么解决办法吗?

django uwsgi apscheduler
3个回答
7
投票

让我们考虑以下事实:

(1) 默认情况下,UWSGI 会在分叉其工作进程之前将 Django 应用程序预加载到 UWSGI 主进程的内存中。

(2) UWSGI 从 master 中“分叉”worker,这意味着它们本质上被复制到每个worker的内存中。由于

fork()

 的实现方式,子进程(即工作进程)不会继承父进程的线程。

(3) 当您调用

BackgroundScheduler.start()

 时,会创建一个线程,负责在任何工作线程/主线程调用此函数时执行作业。

您必须做的就是在创建任何工作进程之前

在主进程上调用BackgroundScheduler.start()

。通过这样做,当创建工人时,他们将不会继承BackgroundScheduler线程(上面的#2),因此不会执行任何作业(但他们仍然可以通过与作业库通信来调度/修改/删除作业!)。
为此,只需确保在实例化应用程序的任何函数/模块中调用

BackgroundScheduler.start()

即可。例如,在以下 Django 项目结构中,我们(可能)希望在

wsgi.py
中执行此代码,这是 UWSGI 服务器的入口点。:

mysite/ manage.py mysite/ __init__.py settings.py urls.py wsgi.py

陷阱:

不要

“在 django 应用程序的 url 中初始化 [e] apscheduler...这将在应用程序启动时启动调度程序。”

这些可能由每个工作线程加载,因此 start() 会被执行多次。


不要以“lazy-app

”模式启动 UWSGI 服务器,这将在创建每个工作人员后在每个工作人员中加载应用程序。

不要使用默认(内存)作业存储运行BackgroundScheduler。这将在所有工人之间造成裂脑综合症。您希望对作业上执行的所有 CRUD 操作强制执行单点事实,就像使用 MongoDB 一样。

这篇文章

可能会为您提供更多详细信息,但仅在 Gunicorn(WSGI 服务器)环境中。


0
投票

from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler()

这将为您提供一个带有名为“default”的 MemoryJobStore 的 BackgroundScheduler 和一个名为“default”的 ThreadPoolExecutor,默认最大线程数为 10。
现在,假设您想减少线程数量,并且还想调整新作业的默认值并设置不同的时区。以下示例将帮助您:

from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(job_defaults=job_defaults, timezone=utc)

最简单、最容易理解的问题解决方案。
例如项目结构:

0
投票
project/ core/ wsgi.py sheduler.py # add this file ... app/ sheduler_tasks.py # add this file signals.py ...

# sheduler.py from apscheduler.schedulers.background import BackgroundScheduler from django_apscheduler.jobstores import DjangoJobStore # Nessesary for multi-threading projects (if you use BackgroundScheduler). # If this func is called in the thread in which the main scheduler was started, # then it will not affect anything. If it is called in another thread, # it will start the scheduler only as a executor. # Use this func in any place where you want to add task. # In particular, it is only necessary in other theads. def prepare_scheduler(): if not scheduler.running: scheduler.start(paused=True) # only registrator, not executor scheduler = BackgroundScheduler() scheduler.add_jobstore(DjangoJobStore(), 'default')
# wsgi.py  (or another entry-point file of your framework)

# Your code ...

from .scheduler import scheduler

scheduler.start()  # registrator & exeсuter
# sheduler_tasks.py

from core.scheduler import scheduler, prepare_scheduler


def sheduler_task():
    prepare_scheduler() 
    scheduler.add_job(your_func, trigger=any_trigger)
# signals.py

# I took this file as an example 
# because signals usually run in a different thread

from django.db.models.signals import any_signal
from django.dispatch import receiver

from .models import YourModel
from .scheduler_tasks import sheduler_task

     
@receiver(any_signal, sender=YourModel)
def add_task(**kwargs):
    sheduler_task()
现在 sheduler 作为执行器只运行一次(wsgi.py 只运行一次(因为它是入口点文件),与其他文件不同)并且与项目并行工作。
同时,在我们的主调度程序未启动的其他线程中,新的调度程序将启动,但仅作为任务注册程序。
我认为问题已经解决了。
    

© www.soinside.com 2019 - 2024. All rights reserved.