我打算使用RailsEventStore将持久的读取模型投影到关系数据库中。
为了使之成为可能,我需要输入顺序固定且没有重复事件的输入流。因此,我构建了一个链接器,以侦听所有事件并将与我的投影相关的那些事件链接到单独的流中。
现在,我想将给定读取模型的构建器注册为该流的订阅者。但是我找不到订阅特定流的方法。
这甚至可能吗?如果是这样,怎么办?
无法从选定的流中订阅事件(但是这个想法似乎很有趣)。您需要实现构建器以读取给定的流并定期处理事件。它需要存储上次处理的事件ID,下次运行时,它应仅读取链接到流的新域事件。
(伪)代码可能看起来像:
class Builder
def initialize(event_store)
@event_store = event_store
@last_processed_event_id = read_last_processed_event
end
def call(stream)
events = read_all_since(stream, event_id)
process(events)
store_last_processed_event
end
private
def read_all_since(stream, id)
@event_store.read.stream(stream).from(id).to_a
end
def process(events)
events.each do |event|
#... do sth here
@last_processed_event_id = event.event_id
end
end
def load_last_processed_event
# read last processed event or return nil to start from beginning of stream
end
def store_last_processed_event
# store it somewhere
end
end