事件采购与事件商店和ORM

问题描述 投票:0回答:1

我建立一个验证服务和Django中的微服务架构。其更多的实验比现实世界实现更加了解事件采购。

我也明白,Django的具有非常再加上框架和使用瓶将是一个更简单的方法,但我试图找出办法解决它的ORM。

用例是非常简单的。用户注册,他将收到一封电子邮件,以激活其账户。用户被激活一次我拍的电子邮件,告知他关于他的帐户被激活。

从我的理解是,在事件源系统。事件被触发和存储,我们可以得到最新的状态,并将其存储在例如数据库中。在我的情况下,将使用Django ORM的Postgres。

我发布事件在Django使用信号卡夫卡。 pre_save()信号。然后该模型将保存对象。

在尚未实现波纹管的更新情况。我只会发布更新领域。并更新在Django的对象。

有谁看到使用这种方法的任何警告或会更好模型的保存方法来实现这一点?

我很想听听你对这些反馈。

# app/services.py

class KafkaService:

    def __init__(self):
        try:
            self.producer = KafkaProducer(bootstrap_servers=settings.KAFKA_BROKERS,
                                          value_serializer=lambda m: json.dumps(m).encode('ascii'))
        except KafkaError as ke:
            logger.exception(f"Something went wrong creating a kafka producer: {ke}")
            self.producer = None
        except Exception as ex:
            logger.exception(f"Something went wrong creating a kafka producer: {ex}")
            self.producer = None

    def publish(self, topic, key, data):
        if not self.producer:
            logger.error(f"Kafka Producer could not establish a connection")
            pass
        try:
            self.producer.send(topic, key=bytes(key, encoding='utf-8'), value=data)
            self.producer.flush()
            logger.info("Message has been published to Kafka")
        except Exception as ex:
            logger.info(f"Errored while publishing to Kafka: {ex}")
# app/events.py

class UserEvent:

    def __init__(self, event_store):  # event store here is Kafka
        """ A user event class which is an injectable. I am using here.
        I need a key for kafka to place the correct event in the correct partition.

        :parameters:
            - event_store: a class form example :class:`KafkaService` which publishes data
        """
        self.event_store = event_store

    def user_created(self, email, data):
        """ Publish an event to the event store when a user is created

        :param email: string
        :param data: string - json
        """
        self.event_store.publish('user-created', email, data)

    def user_activated(self, email, data):
        """ Publish an event to the event store when a user is activated """
        self.event_store.publish('user-activated', email, data)
# app/signals.py

kafka_service = KafkaService()

user_event = UserEvent(kafka_service)

def user_added_event(sender, instance, created, **kwargs):
    if created:
        from users.api.v1.serializers import UserSerializer  # Avoid (Apps Not Read)

        value = UserSerializer(instance).data
        user_event.user_created(instance.email, value)

    else:
        logger.info("User Updated")
django orm apache-kafka cqrs event-sourcing
1个回答
1
投票

有谁看到使用这种方法的任何警告或会更好模型的保存方法来实现这一点?

通常的设计分离持久性的担忧域模型(又名业务逻辑)。

如果事件的顺序是你的真理之源,两件事情如下:首先,你要确保你成功地存储您的事件发布之前。其次,你要确保你写的语义是“第一写入胜”。

如果你是正确的,而且每个人都明白,在数据库中的模型可以是“幕后黑手”,在活动时间内商店的事件,那么你是在良好的状态。

最终一致的系统使“读自己写”预期的挑战性。所以,你可能有一些额外的工作存在。

© www.soinside.com 2019 - 2024. All rights reserved.