因为我找不到任何不使用循环来获取流内容的实现,所以我开始实现一个流内容,但是我遇到了一些问题,也许你们中的一些人可以将我指向正确的位置。
该实现结合使用Pub / Sub和流:*日志->流通道* log:notification-> pub / sub* log:lastReadMessage->包含流中的最后一个读取键
发布者
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
while(true)
{
var value = new NameValueEntry[]
{
new NameValueEntry("id", Guid.NewGuid().ToString()),
new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
};
redisDb.StreamAdd("log", value);
var publisher = connectionMultiplexer.GetSubscriber();
publisher.Publish("log:notify", string.Empty, CommandFlags.None);
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
订阅者
static async Task Main(string[] args)
{
var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
var redisDb = connectionMultiplexer.GetDatabase(1);
var observableStream = CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
.Subscribe(x => {
Console.WriteLine(x);
});
Console.ReadLine();
}
private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(obs =>
{
var subscriber = connection.GetSubscriber();
subscriber.Subscribe($"{channel}:notify", async (ch, msg) =>
{
var locker = await taskFromStreamBlocker
.WaitAsync(0)
.ConfigureAwait(false);
if (!locker)
{
return;
}
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
taskFromStreamBlocker.Release();
});
return Disposable.Create(() => subscriber.Unsubscribe(channel));
});
}
为什么要使用信号灯?
因为我可以向流中添加很多消息,并且我不希望o将同一条消息处理两次。
问题
如果流中有未处理的消息,如何在没有发布/订阅事件的情况下进行处理当我们开始时,我们可以验证它是否为未处理的消息并对其进行处理。如果在这段时间中有新消息添加到流中,而我们仍未订阅发布/订阅,则订阅者将不会处理该消息,直到我们通过发布/订阅收到通知。
信号量对于不两次处理同一条消息很重要,但同时也是一种诅咒。在消息处理期间,可以将另一个消息添加到流中。发生这种情况时,订户将不会立即处理,而只会在下次收到通知时处理(此时将处理两条消息)。
您将如何实施?是否仅使用Rx实现Redis流?该解决方案不应使用某种循环,并且应提高内存效率。这可能吗?
最诚挚的祝福
Paulo Aboim Pinto
这是我要避免的WHILE解决方案
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
return Observable.Create<string>(async obs =>
{
while(!cancellationToken.IsCancellationRequested)
{
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
await Task.Delay(TimeSpan.FromMilliseconds(500));
}
return Disposable.Empty;
});
}
这是使用具有200ms经过时间的计时器的另一种解决方案
private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
{
var lastReadMessage = "0-0";
var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
if (string.IsNullOrEmpty(lastReadMessageData))
{
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
}
else
{
lastReadMessage = lastReadMessageData;
}
var instance = ThreadPoolScheduler.Instance;
return Observable.Create<string>(obs =>
{
var disposable = Observable
.Interval(TimeSpan.FromMilliseconds(200), instance)
.Subscribe(async _ =>
{
var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);
foreach(var message in messages)
{
obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
lastReadMessage = message.Id;
}
redisDb.KeyDelete($"{channel}:lastReadMessage");
redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
});
cancellationToken.Register(() => disposable.Dispose());
return Disposable.Empty;
});
}
我使用紧密循环,只是做一个XRange并保存一个位置-KISS ..但是,如果没有工作,它会退后,因此在有很多紧密循环时,它会非常快。
[如果您需要更高的性能,例如在处理时阅读,但是在大多数情况下,我会警告您不要这样做。
我不再使用分布式锁/信号量。
如果您处理命令(例如,执行某事而不是xyz),则这些操作可能会失败。再次,用户应处理已经发生的情况,而不是redis / stream读取部分。
有些带有魔术回调的库无法解决这些问题,当在任何节点等上运行超时时,回调将重试。复杂性/问题仍然存在,它们只能移至其他地方。
对于消费者来说,您可能有一个可观察的顶端,但这基本上是表面上的,它不能解决问题,并且如果您在某个地方的许多实现下查看,您将看到相同的循环。我不会使用它来让消费者注册一个动作。
例如
public interface IStreamSubscriber
{
void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
void Start();
}
在您的情况下,回叫可能具有可观察的功能,并且不使用循环,但是在下面有一个低级循环,它也可以为使用者进行消息到对象的转换。