Redis Stream 我需要事件驱动架构,

问题描述 投票:0回答:2

当流中收到任何新消息时,应触发 OnMessageReceived 事件。

public event EventHandler<MqMessageReceivedEventArgs> OnMessageReceived;


var result = await redisDatabase.StreamRangeAsync(StreamName, "-", "+", 1, Order.Ascending);

  protected virtual void OnMessageReceivedEvent(MqMessageReceivedEventArgs e)
  {
      OnMessageReceived?.Invoke(this, e);
  }


c# .net redis stackexchange.redis redis-streams
2个回答
0
投票

根据官方文档,您必须执行以下操作:

定义传递函数:

public static class RedisHelper
{
    Dictionary<string, string> ParseResult(StreamEntry entry) => 
        entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
}

启动消费者任务:


var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var readTask = Task.Run(async () =>
{
    while (!token.IsCancellationRequested)
    {
        var result = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
        if (result.Any())
        {
            var dictionaries = result.Select(r => ParseResult(r)).ToList();
            // or invoke event in loop
            OnMessageReceivedEvent(new MqMessageReceivedEventArgs(dictionaries))
        }

        await Task.Delay(1000);
    }
});

0
投票

看起来您正在尝试调用/触发 OnMessageReceivedEvent,当调用此事件时,带有新消息的事件将在流中接收。假设逻辑已经实现并且它监听流中的消息。

public async Task ListenForMessages()
{
    while (true)
    {
        var result = await redisDatabase.StreamRangeAsync(StreamName, "-", "+", 1, Order.Ascending);
        if (result.Length > 0)
        {
            foreach (var message in result)
            {
                OnMessageReceived?.Invoke(this, new MqMessageReceivedEventArgs(message));
            }
        }
        await Task.Delay(1000); // Delay before checking for new messages again
    }
}
public class MqMessageReceivedEventArgs : EventArgs
{
public MqMessageReceivedEventArgs(StreamEntry message)
{
    // Initialize any properties you want to pass along with the event
}
}
© www.soinside.com 2019 - 2024. All rights reserved.