具有域事件的Kafka

问题描述 投票:0回答:1

在事件驱动的项目中,我有Commands类型的消息,而在响应中,我有Events

这些CommandsEvents消息表示域,因此它们包含域中的复杂类型。

示例:

RegisterClientCommand(Name, Email)

ClientRegisteredEvent(ClientId)

域中还有更多的这些命令和事件对。

我在想类似的东西:

RawMessage(payloadMap, sequenceId, createdOn)

有效负载将保存消息域类类型名称和消息字段。

我也在阅读有关Avro格式的信息,但是似乎为定义每条消息的消息格式进行了大量工作。

关于通过Kafka经纪人实际传输的消息格式的最佳实践是什么?

java spring-boot apache-kafka confluent event-driven-design
1个回答
0
投票

没有唯一的“最佳”方法,这完全取决于您的团队/组织的专业知识以及项目的特定要求。

Kafka本身对消息实际包含的内容无动于衷。在大多数情况下,它只是将消息值和键视为不透明的字节数组。

无论您最终如何将RawMessage定义为Java端,都必须将其序列化为字节数组才能将其生成到Kafka中,因为KafkaProducer就是这样。也许这是您已经拥有的自定义字符串序列化程序,也许您可​​以使用Jackson或类似方法将POJO序列化为JSON。或者,也许您只是发送一个巨大的逗号分隔的字符串作为消息。这完全取决于您。

重要的是,使用者在从kafka主题中提取消息时,能够正确,可靠地从消息中的每个字段读取数据,而不会出现任何错误,版本冲突等。大多数serde / schema机制可以存在,例如Avro,Protobuf或Thrift,请尝试简化您的工作。特别复杂的事情,例如确保新消息与同一消息的先前版本向后兼容。

  • 大多数人最终会合并以下各项:
    • 用于创建要生成到Kafka的字节数组的Serde机制,一些流行的机制是AvroProtobufThrift
    • 原始JSON字符串
    • 一个巨大的字符串,具有某种内部/自定义格式,可以对其进行分析/分析。
  • 某些公司使用集中式架构服务。这样一来,您的数据使用者就不必提前知道消息包含的架构,他们只需拉出消息,然后从服务中请求相应的架构即可。 Confluent拥有自己的自定义架构注册表解决方案,该解决方案已经支持Avro数年,并且从几周前开始,现在已经正式支持Protobuf。这是不需要,并且如果您端到端拥有生产者/消费者,则可以决定自己处理序列化,但是很多人已经习惯了。
  • [取决于消息类型,有时您需要压缩,因为消息可能非常重复和/或很大,因此如果发送压缩消息,最终会节省一些存储空间和带宽,但会占用一些CPU资源和延迟。这也可以由您自己在生产者/消费者方面处理,在序列化之后压缩字节数组,或者您可以直接在生产者方面请求消息压缩(在Kafka文档中查找compression.type)。
© www.soinside.com 2019 - 2024. All rights reserved.