如何使用C#Rx实现Redis流

问题描述 投票:0回答:3

因为我找不到任何不使用循环来获取流内容的实现,所以我开始实现一个流内容,但是我遇到了一些问题,也许你们中的一些人可以将我指向正确的位置。

该实现结合使用Pub / Sub和流:*日志->流通道* log:notification-> pub / sub* log:lastReadMessage->包含流中的最后一个读取键

发布者

        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);

            while(true)
            {
                var value =  new NameValueEntry[]
                {
                    new NameValueEntry("id", Guid.NewGuid().ToString()),
                    new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
                };

                redisDb.StreamAdd("log", value);
                var publisher = connectionMultiplexer.GetSubscriber();
                publisher.Publish("log:notify", string.Empty, CommandFlags.None);
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }

订阅者

        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);


            var observableStream =  CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
                .Subscribe(x => {
                  Console.WriteLine(x);  
                });

            Console.ReadLine();
        }
        private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);

        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }


            return Observable.Create<string>(obs => 
            {
                var subscriber = connection.GetSubscriber();
                subscriber.Subscribe($"{channel}:notify", async (ch, msg) => 
                {
                    var locker = await taskFromStreamBlocker
                        .WaitAsync(0)
                        .ConfigureAwait(false);

                    if (!locker)
                    {
                        return;
                    }

                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    taskFromStreamBlocker.Release();
                });

                return Disposable.Create(() => subscriber.Unsubscribe(channel));
            });
        }

为什么要使用信号灯?

因为我可以向流中添加很多消息,并且我不希望o将同一条消息处理两次。

问题

  1. 如果流中有未处理的消息,如何在没有发布/订阅事件的情况下进行处理当我们开始时,我们可以验证它是否为未处理的消息并对其进行处理。如果在这段时间中有新消息添加到流中,而我们仍未订阅发布/订阅,则订阅者将不会处理该消息,直到我们通过发布/订阅收到通知。

  2. 信号量对于不两次处理同一条消息很重要,但同时也是一种诅咒。在消息处理期间,可以将另一个消息添加到流中。发生这种情况时,订户将不会立即处理,而只会在下次收到通知时处理(此时将处理两条消息)。

您将如何实施?是否仅使用Rx实现Redis流?该解决方案不应使用某种循环,并且应提高内存效率。这可能吗?

最诚挚的祝福

Paulo Aboim Pinto

c# redis system.reactive
3个回答
0
投票

这是我要避免的WHILE解决方案

        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }

            return Observable.Create<string>(async obs => 
            {
                while(!cancellationToken.IsCancellationRequested)
                {
                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    await Task.Delay(TimeSpan.FromMilliseconds(500));
                }

                return Disposable.Empty;
            });
        }

0
投票

这是使用具有200ms经过时间的计时器的另一种解决方案


        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }

            var instance = ThreadPoolScheduler.Instance;

            return Observable.Create<string>(obs => 
            {
                var disposable = Observable
                    .Interval(TimeSpan.FromMilliseconds(200), instance)
                    .Subscribe(async _ => 
                    {
                        var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                        foreach(var message in messages)
                        {
                            obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                            lastReadMessage = message.Id;
                        }

                        redisDb.KeyDelete($"{channel}:lastReadMessage");
                        redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                    });
                cancellationToken.Register(() => disposable.Dispose());

                return Disposable.Empty;    
            });
       }


0
投票

我使用紧密循环,只是做一个XRange并保存一个位置-KISS ..但是,如果没有工作,它会退后,因此在有很多紧密循环时,它会非常快。

[如果您需要更高的性能,例如在处理时阅读,但是在大多数情况下,我会警告您不要这样做。

  1. 它带来了很多复杂性,需要坚如磐石。
  2. Redis通常足够快
  3. “我不希望o将同一封邮件处理两次。”几乎每个系统都至少有一次交付,消除崩溃周围的故障是令人难以置信的困难/缓慢。您可以使用id的哈希集将其部分删除,但是对于消费者来说,处理它和设计为幂等的消息非常简单。这可能是消息设计问题的根本原因。如果对每个读取器进行分区(单独的流和每个流1个工作器),则可以将哈希集保留在内存中,以避免扩展/分布式问题。请注意,Redis流可以保留顺序,使用它可以生成更简单的幂等消息。
  4. 例外,您不希望停止处理流,因为使用者对一条消息有逻辑异常,例如,在整个系统已停止的晚上接到电话,锁使情况变得更糟。事件数据无法更改,因此请尽力而为。但是,红外/ redis异常确实需要引发并重试。在循环之外进行管理非常痛苦。
  5. 简单的背压。如果您不能足够快地处理工作,则循环会变慢,而不是创建大量任务并消耗所有内存。

我不再使用分布式锁/信号量。

如果您处理命令(例如,执行某事而不是xyz),则这些操作可能会失败。再次,用户应处理已经发生的情况,而不是redis / stream读取部分。

有些带有魔术回调的库无法解决这些问题,当在任何节点等上运行超时时,回调将重试。复杂性/问题仍然存在,它们只能移至其他地方。

对于消费者来说,您可能有一个可观察的顶端,但这基本上是表面上的,它不能解决问题,并且如果您在某个地方的许多实现下查看,您将看到相同的循环。我不会使用它来让消费者注册一个动作。

例如

    public interface IStreamSubscriber
    {
        void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
        void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
        void Start();
    }    

在您的情况下,回叫可能具有可观察的功能,并且不使用循环,但是在下面有一个低级循环,它也可以为使用者进行消息到对象的转换。

© www.soinside.com 2019 - 2024. All rights reserved.