如何在Celery Task中从Django settings.py中检索动态值?

问题描述 投票:0回答:1

我的Django依靠Celery和Celerybeat生成任务。我有3个systemd服务/单元:

  1. myapp.service(gunicorn守护程序)
  2. celery-myapp.service(芹菜工作者)
  3. celerybeat-myapp.service(celerybeat守护程序)

我在主“ myapp.service”服务(系统单元)中定义了一个环境变量MY_SECRET。

我无法从settings.py或任务中的环境变量(tasks.py)中直接获取此值,但是我可以直接使用settings.py或环境变量从视图(views.py)中平稳地获取此值。

我是否需要将myapp.service中定义的环境变量MY_SECRET复制到celery-myapp.service和celerybeat-myapp.service,以便可以从Celery任务中获取此值?我如何检索我的芹菜任务MY_SECRET值?

myapp.service

[Unit]
Description=My App
After=network.target
Wants=celery-myapp.service
Wants=celerybeat-myapp.service

[Service]
User=myapp
Group=nginx
Environment=MY_SECRET=1234
WorkingDirectory=/opt/myproject/
ExecStart=/opt/myproject/env/bin/gunicorn --workers 3 --log-level debug --bind unix://opt/myproject/myapp.sock myaproject.wsgi:application

[Install]
WantedBy=multi-user.target

在我的Django项目'settings.py'中引用了环境变量以及我打算在应用程序中使用的其他设置:

settings.py

# Populated with environment variable defined by systemd unit
MY_SECRET = os.environ.get('MY_SECRET')
# Static value
MY_URL= 'http://127.0.0.1'

在我看来,我可以从settings.py或直接从环境中获取定义为环境变量MY_SECRET的值:

views.py

def my_view(request):
    from django.conf import settings as project_settings

    my_secret_env = os.environ.get('MY_SECRET')
    my_secret_setting = project_settings.MY_SECRET
    my_url_setting = project_settings.MY_URL

   return render(request,'mypage.html',{
       'my_secret_env': my_secret_env,
       'my_secret_setting': my_secret_setting,
       'my_url_setting': my_url_setting,
   )

mypage.html

    {% block page_content %}
    <!-- Value displayed properly -->
    <p><strong>my_secret_env:</strong> {{ my_secret_env }}</p>
    <!-- Value displayed properly -->
    <p><strong>my_secret_setting:</strong> {{ my_secret_setting }}</p>
    <!-- Value displayed properly -->
    <p><strong>my_url_setting:</strong> {{ my_url_setting }}</p>
    {% endblock %}

我无法从settings.py或直接从环境中获取定义为环境变量MY_SECRET的值。

另一方面,可以正确地检索来自settings.py的静态值:

tasks.py

@shared_task
def task_mytask():
    from django.conf import settings as project_settings

    my_secret_env = os.environ.get('MY_SECRET')
    my_secret_setting = project_settings.MY_SECRET
    my_url_setting = project_settings.MY_URL

    # No value displayed
    print(my_secret_env)
    # No value displayed
    print(my_secret_setting)
    # Value displayed properly
    print(my_url_setting)

@periodic_task(run_every=timedelta(seconds=60))
def task_myperiodictask():
    task_mytask.delay()
django celery systemd
1个回答
0
投票

task.py

@shared_task() def task_mytask(my_secret_env, my_secret_setting): ...

views.py

task_mytask.apply_async(args=[my_secret_env, my_secret_setting])

© www.soinside.com 2019 - 2024. All rights reserved.