我正在开发一个项目,需要跟踪与流程相关的事件。就我而言,我有
Registration
和 RegistrationEvent
模型,后者通过外键连接到 Registration
。
我还编写了一个名为
RegistrationEvent
的 _ensure_correct_flow_of_events
方法,它可以防止以无意义的顺序添加事件,并在调用 model.save
时调用。事实上,事件的流程一定是SIGNED -> STARTED -> SUCCESS -> CERTIFICATE_ISSUED
。事件CANCELED
随时可能发生。
为了评估事件序列,此方法调用另一个方法 _get_previous_event
,该方法返回注册到 Registration
的最后一个事件。
创建
SUCCESS
事件后,save
方法调用 Registration.threaded_issue_certificate
,该方法应该创建证书,然后在新线程中创建 CERTIFICATE_ISSUED
事件,以便快速处理响应。问题是,当即将创建 CERTIFICATE_ISSUED
和 _ensure_correct_flow_of_events
时,_get_previous_event
不会返回刚刚创建的 _get_previous_event
事件,而是返回之前的 SUCCESS
事件。我的日志是
STARTED
这是为什么?
我将我的
Checking correct flow, previous event: Course started - admin registration id: 1 current event_type: 3
Checking correct flow, previous event: Course started - admin registration id: 1 current event_type: 4
Exception in thread Thread-2 (threaded_issue_certificate):
Traceback (most recent call last):
File "/Users/zenodallavalle/miniconda3/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/Users/zenodallavalle/miniconda3/lib/python3.10/threading.py", line 953, in run
[21/Mar/2024 15:16:41] "POST /admin/main/registrationevent/add/ HTTP/1.1" 302 0
self._target(*self._args, **self._kwargs)
File "/Users/zenodallavalle/Downloads/test/main/models.py", line 79, in threaded_issue_certificate
return self.issue_certificate()
File "/Users/zenodallavalle/Downloads/test/main/models.py", line 84, in issue_certificate
RegistrationEvent.objects.create(
File "/Users/zenodallavalle/Downloads/test/env/lib/python3.10/site-packages/django/db/models/manager.py", line 87, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
File "/Users/zenodallavalle/Downloads/test/env/lib/python3.10/site-packages/django/db/models/query.py", line 679, in create
obj.save(force_insert=True, using=self.db)
File "/Users/zenodallavalle/Downloads/test/main/models.py", line 175, in save
self._ensure_correct_flow_of_events(is_new=is_new)
File "/Users/zenodallavalle/Downloads/test/main/models.py", line 155, in _ensure_correct_flow_of_events
raise ValueError(
ValueError: After started next event must be 'success' or 'canceled'
留在这里以便复制该行为。
models.py
所需的进口是:
from django.db import models
from django.contrib.auth.models import User
from logging import getLogger
from main.utils import make_thread
logger = getLogger(__name__)
import threading
def make_thread(fn):
def _make_thread(*args, **kwargs):
thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
thread.start()
return thread
return _make_thread
class Registration(models.Model):
course_user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name="registrations",
)
created_by = models.ForeignKey(
User,
null=True,
on_delete=models.SET_NULL,
)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def _check_user_not_signed_for_other_courses(self):
other_registrations = self.course_user.registrations.exclude(pk=self.pk)
if any([not r.ended for r in other_registrations]):
raise ValueError("User already signed for another course")
def _ensure_created_by_is_not_null(self):
if self.created_by is None:
raise ValueError("Created by is null")
def save(self, *args, **kwargs) -> None:
is_new = self._state.adding
if is_new:
self._ensure_created_by_is_not_null()
self._check_user_not_signed_for_other_courses()
ret = super().save(*args, **kwargs)
if is_new:
RegistrationEvent.objects.create(
course_registration=self,
event_type=RegistrationEvent.EventType.SIGNED,
)
return ret
def __str__(self):
return f"{self.course_user} registration id: {self.pk}"
def __repr__(self):
return f"<Registration: {self.course_user} registration id: {self.pk}>"
@property
def ended(self):
return self.events.filter(
event_type__in=(
RegistrationEvent.EventType.CERTIFICATE_ISSUED,
RegistrationEvent.EventType.CANCELED,
RegistrationEvent.EventType.FAILED,
)
).exists()
@property
def last_event(self):
return self.events.order_by("-created_at").first()
@make_thread
def threaded_issue_certificate(self):
return self.issue_certificate()
def issue_certificate(self):
# Do something here
# Register it as an event
RegistrationEvent.objects.create(
course_registration=self,
event_type=RegistrationEvent.EventType.CERTIFICATE_ISSUED,
)
class RegistrationEvent(models.Model):
class EventType(models.IntegerChoices):
SIGNED = 1, "Signed up"
STARTED = 2, "Course started"
SUCCESS = 3, "Course success"
CERTIFICATE_ISSUED = 4, "Certificate issued"
CANCELED = 5, "Cancelled"
FAILED = 6, "Course failed"
course_registration = models.ForeignKey(
Registration,
on_delete=models.CASCADE,
related_name="events",
)
event_type = models.IntegerField(choices=EventType.choices)
created_at = models.DateTimeField(auto_now_add=True, verbose_name="Creato il")
updated_at = models.DateTimeField(auto_now=True, verbose_name="Aggiornato il")
@property
def event_type_description(self):
return self.EventType(self.event_type).label
def __str__(self):
return f"{self.event_type_description} - {self.course_registration}"
def __repr__(self):
return f"<RegistrationEvent: {self.event_type_description} - {self.course_registration}>"
def _get_previous_event(self, is_new):
qs = self.course_registration.events.all()
if not is_new:
qs = qs.exclude(created_at__gte=self.created_at)
return qs.order_by("-created_at").first()
def _ensure_correct_flow_of_events(self, is_new):
# If the course is completed (certificate issued or cancelled), no more events can be added
if is_new:
if self.course_registration.ended:
raise ValueError(
"Il corso è completato, non è possibile aggiungere eventi"
)
if self.event_type == self.EventType.CANCELED:
return # No further checks needed
previous_event = self._get_previous_event(is_new=is_new)
print(
"Checking correct flow, previous event:",
previous_event,
"current event_type:",
self.event_type,
)
if not previous_event:
if self.event_type != self.EventType.SIGNED:
raise ValueError("First event must be 'signed'")
elif previous_event.event_type == self.EventType.SIGNED:
if self.event_type != self.EventType.STARTED:
raise ValueError("After signed next event must be 'started'")
elif previous_event.event_type == self.EventType.STARTED:
if self.event_type not in (
self.EventType.SUCCESS,
self.EventType.CANCELED,
):
raise ValueError(
"After started next event must be 'success' or 'canceled'"
)
elif previous_event.event_type == self.EventType.SUCCESS:
if self.event_type != self.EventType.CERTIFICATE_ISSUED:
raise ValueError(
"After success next event must be 'certificate issued'"
)
def _issue_certificate_if_needed(self, is_new):
if not is_new:
return
if not self.event_type == self.EventType.SUCCESS:
return
self.course_registration.threaded_issue_certificate()
def save(self, *args, **kwargs):
is_new = self._state.adding
if is_new:
self._ensure_correct_flow_of_events(is_new=is_new)
ret = super().save(*args, **kwargs)
self._issue_certificate_if_needed(is_new=is_new)
return ret
实现的方法类似于:
from django.db.models.signals import post_save
from django.dispatch import receiver
如果您不想采用这种方法,还有其他方法可以做到。我也看到过使用事务模块的解决方案。