随着我的在线申请。
再由应用消费者接收消息,并转发给感兴趣的处理者。
命令示例
RegisterClientCommand
PlaceBookingCommand
CancelBookingCommand
事件实例
ClientRegisteredEvent
BookingPlacedEvent
BookingCancelledEvent
命令和事件的类型都是 DomainMessage
每个DomainMesage都有一个 sequence-number
, createdOn
领域
我决定使用支持模式版本的Avro来进行消息交换。
Spring Kafka生产者。
ProducerFactory<String?, GenericRecord?>
Spring Kafka Consumer:
ConsumerFactory<String?, GenericRecord?>
点(1)当Kafka消息被发送时,会创建一个GenericRecord,如下所示。
return GenericRecordBuilder(schema).apply {
set("first", registerClientCommand.first)
set("last", registerClientCommand.last)
set("email", registerClientCommand.email)
set("mobile", registerClientCommand.mobile)
}.build()
点(2)当收到消息时。 - 从GenericRecord建立一个Map--并使用Map中的字段值创建一个具体的RegisterClientCommand。
return RegisterClientCommand(
fields.get("first"),
fields.get("last"),
fields.get("email"),
fields.get("mobile")
)
你觉得这样做对吗?
我不喜欢这样的事实:对于每个DomainMessage(Commands,Events),我都要
我使用Avro的方式是否正确?
使用Spring Kafka库怎么样?这可以处理各种转换和订阅者。 你需要提供一个Bean的 RecordMessageConverter
这个bean必须实现简单的Avro转换,将POJO转换为Avro,将Avro消息转换为POJO。
消息推送也可以被简化,你需要提供bean Topic
消息发送将是这样的
@Service
public class Kafka {
@Autowired
private KafkaTemplate<Object, Object> template;
public void sendClass1(Class1 object) {
this.template.send("topic1", object);
}
}