问题
我正在尝试创建一种方法来侦听来自 Redis 服务器的流,并根据当前的流类型返回流条目列表。为此,我创建了一个简单的抽象基类,其中包含我期望从所有流条目类型获得的属性
public abstract class RedisStreamEntryDto
{
public required float Progress { get; set; }
public required bool RunDone { get; set; }
}
监听流的类看起来像下面的代码,它已经说明了问题:
public class RedisUtils(ConnectionMultiplexer redis)
{
public async Task<List<RedisStreamEntryDto>> GetStreamEntriesAsync(string streamName):
{
var db = redis.GetDatabase();
bool stopListening = false;
List<RedisStreamEntryDto> streamItems = [];
while (!stopListening)
{
StreamEntry[] streamEntries = await db.StreamReadAsync(streamName, "0-0");
foreach (var entry in streamEntries)
{
if(entry.IsNull)
{
continue;
}
var add_entry = // how to instantiate?
streamItems.Add(add_entry);
if (add_entry.RunDone)
{
stopListening = true;
break;
}
}
}
return streamItems;
}
}
通常我会通过传递 JSONS 并调用
JsonSerializer.Deserialize<T>(entry)
来处理这个问题,但接收到的值是 RedisValue
类型并且没有内置反序列化,或者更确切地说,我首先必须创建一个看起来很 hacky 的 JSON 字符串。
接近
想法是简单地强制所有继承自
RedisStreamEntryDto
的类实现返回正确实例的静态 FromStreamEntry(StreamEntry entry)
方法。我遇到的问题是如何在基类中实现它以便能够使用它。抽象静态方法是不可能的,我也已经尝试使用接口而不是抽象类,但这基本上会导致同样的问题。
到目前为止我能想到的唯一解决方案是创建一个单独的工厂类,这对于这个“简单”问题来说有点烦人。有没有更好的解决办法?
编辑 基于以下评论的一些附加信息:我从调用者那里知道需要哪种类型的流条目,因此我可以使该方法通用:
public class RedisUtils(ConnectionMultiplexer redis)
{
public async Task<List<T>> GetStreamEntriesAsync<T>(string streamName) where T: RedisStreamEntryDto
{
var db = redis.GetDatabase();
bool stopListening = false;
List<T> streamItems = [];
while (!stopListening)
{
StreamEntry[] streamEntries = await db.StreamReadAsync(streamName, "0-0");
foreach (var entry in streamEntries)
{
if(entry.IsNull)
{
continue;
}
var add_entry = T.FromStreamEntry(entry) // how to instantiate?
streamItems.Add(add_entry);
if (add_entry.RunDone)
{
stopListening = true;
break;
}
}
}
return streamItems;
}
}
问题仍然存在,如何实现基本抽象类(或接口)才能做到这一点。
因此,在上面评论中的讨论之后,我的实现如下所示(显然,这只是单个文件中的测试实现,没有错误检查和所有其他需要的东西):
public interface IRedisStreamEntryFactory
{
RedisStreamEntryDto CreateEntryFromStreamEntry(StreamEntry entry);
}
public class ConcreteTaskStreamEntryFactory : IRedisStreamEntryFactory
{
public RedisStreamEntryDto CreateEntryFromStreamEntry(StreamEntry entry)
{
return new MyConcreteTaskStreamEntryDto()
{
Message = entry["message"].ToString(),
Progress = float.Parse(entry["progress"]),
RunDone = bool.Parse(entry["run_done"])
};
}
}
public class OtherConcreteTaskStreamEntryFactory : IRedisStreamEntryFactory
{
public RedisStreamEntryDto CreateEntryFromStreamEntry(StreamEntry entry)
{
return new MyOtherConcreteTaskStreamEntryDto()
{
OtherProperty = entry["other_property"].ToString(),
Progress = float.Parse(entry["progress"]),
RunDone = bool.Parse(entry["run_done"])
};
}
}
public class RedisUtils(ConnectionMultiplexer redis)
{
public async Task<List<RedisStreamEntryDto>> GetStreamEntriesAsync(IRedisStreamEntryFactory factory, string streamName)
{
var db = redis.GetDatabase();
bool stopListening = false;
List<RedisStreamEntryDto> streamItems = [];
while (!stopListening)
{
StreamEntry[] streamEntries = await db.StreamReadAsync(streamName, "0-0");
foreach (var entry in streamEntries)
{
if(entry.IsNull)
{
continue;
}
var add_entry = factory.CreateEntryFromStreamEntry(entry);
streamItems.Add(add_entry);
if (add_entry.RunDone)
{
stopListening = true;
break;
}
}
}
return streamItems;
}
}
我猜这只是一个简单的工厂模式。