从eventstore重播特定类型的事件

问题描述 投票:1回答:2

我目前正在使用Event Store来处理我的活动。我目前需要重播特定类型的事件,因为我已经对订阅和写入数据库的方式进行了更改。

这可能吗?如果是这样,怎么办呢?谢谢。

event-sourcing get-event-store
2个回答
0
投票

您不能告诉EventStore将特定事件重播到持久订阅,因为持久订阅的目的是为订户保持状态。

要实现这种修复,您真的需要一个追赶应用程序来完成这项工作。

如果您考虑一下,如果您将所有事件重播到新数据库,那么您将拥有正确的数据吗?

所以我有一个控制台应用程序,它重用与持久连接相同的逻辑,但唯一的区别是:

  1. 我更改目标数据库连接字符串 - 所以这将是一个新的数据库或集合(不是破坏的)
  2. 它连接到EventStore并从一开始就重放所有事件
  3. 它将整个数据库重建为正确的状态
  4. 将业务切换到新数据库

这是EventStore的重点 - 您只需重播所有事件即可随时构建任何数据库,这将是正确的

持久连接处理新的传入事件并应用更新。


0
投票

如果启用$ by_event_type投影,则可以访问该投影流

/流/ $ {ET-事件类型}

https://eventstore.org/docs/projections/system-projections/index.html

然后你可以使用.net api阅读它。

这里有一些代码可以帮助您入门

      private static T GetInstanceOfEvent<T>(ResolvedEvent resolvedEvent) where T : BaseEvent
            {

                var metadataString = Encoding.UTF8.GetString(resolvedEvent.Event.Metadata);
                var eventClrTypeName = JObject.Parse(metadataString).Property(EventClrTypeHeader).Value;
                var @event = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(resolvedEvent.Event.Data), Type.GetType((string) eventClrTypeName));
                if (!(@event is BaseEvent))
                {
                    throw new MessageDeserializationException((string) eventClrTypeName, metadataString);
                }

                return @event as T;
            }

            private static IEventStoreConnection GetEventStoreConnection()
            {
                var connectionString = System.Configuration.ConfigurationManager.ConnectionStrings["EventStore"].ConnectionString;
                var connection = EventStoreConnection.Create(connectionString);
                connection.ConnectAsync().Wait();
                return connection;
            }

            private static string GetStreamName<T>() where T : BaseEvent
            {
                return "$et-" + typeof(T).Name;
            }

要阅读事件,您可以使用此代码段

 StreamEventsSlice currentSlice;
            long nextSliceStart = StreamPosition.Start;
            const int sliceCount = 200;

            do
            {
                currentSlice = await esConnection.ReadStreamEventsForwardAsync(streamName, nextSliceStart, sliceCount, true);

                foreach (var @event in currentSlice.Events)
                {
                    var myEvent = GetInstanceOfEvent<OrderMerchantFeesCalculatedEvent>(@event);


                    TransformEvent(myEvent);
                }

                nextSliceStart = currentSlice.NextEventNumber;

            } while (currentSlice.IsEndOfStream == false);