我想通过
IAsyncEnumerable<string>
公开从文件中读取的数据,由第三方库完成,因此我的调用代码可以在数据传入时对其进行处理。
问题是我必须将一次性(这里以
MemoryStream
为模型)传递给第三方库,并且第三方库的主类本身也必须被处理(参见StreamReader
)。这是一个人为的例子,现实生活中确实需要双重处置。
var dependency = new UsingDependency();
var wrapper = new Wrapper(dependency);
// My calling code, fake reading a file and then process it:
using var stream = new MemoryStream("Multi\nLine\nString"u8.ToArray());
await foreach (var s in wrapper.ReadStringsAsync(stream))
{
Console.WriteLine(s);
}
public class Wrapper
{
private readonly UsingDependency _usingDependency;
public Wrapper(UsingDependency usingDependency) { _usingDependency = UsingDependency; }
// The "wrapper" method I have, to hide and dispose the StreamReader:
public IAsyncEnumerable<string> ReadStringsAsync(Stream stream)
{
using var reader = new StreamReader(stream);
if (reader.EndOfStream)
{
throw new InvalidOperationException("Can't read an empty file");
}
return _usingDependency.ReadStringsAsync(reader);
}
}
Wrapper.ReadStringsAsync()
的全部意义在于向调用者隐藏 StreamReader
正在被使用,但出于所有意图和目的,它并不是真正的 StreamReader
并且必须在使用后处理它。
实际的依赖是这样的:
public class UsingDependency
{
public async IAsyncEnumerable<string> ReadStringsAsync(StreamReader reader)
{
while (true)
{
var line = await reader.ReadLineAsync();
if (line == null)
{
yield break;
}
yield return line;
}
}
}
我不能让我的代码在处理所有数据之前处理读者。不过,这段代码的作用是在读取第一行之前就处理 StreamReader:
System.ObjectDisposedException:无法从关闭的 TextReader 中读取。
如何重构非异步
IAsyncEnumerable<string> Wrapper.ReadStringsAsync()
方法,以便:
await foreach()
调用。StreamReader
的使用。StreamReader
,因为它需要调用它的方法before返回IAsyncEnumerable
.yield return
.StreamReader
在它的所有内容都被枚举之后。在写这个问题的时候,我找到了两个解决方案:
选项 1,关于上面的第 4 点:从异步方法返回 IAsyncEnumerable
确实让它异步,让它异步枚举结果并再次返回它们:
public async IAsyncEnumerable<string> ReadStringsAsync(Stream stream)
{
using var reader = new StreamReader(stream);
if (reader.EndOfStream)
{
throw new InvalidOperationException("Can't read an empty file");
}
await foreach (var s in _usingDependency.ReadStringsAsync(reader))
{
yield return s;
}
}
但这感觉就像异步。这不会编译成一个无关的状态机和枚举器,它们除了代理实际工作和增加开销外什么都不做吗?
选项 2,关于上述第 5 点:IAsyncEnumerable 方法中的正确处置?
通过使非异步返回一次性变量
out
变量:
public IAsyncEnumerable<string> ReadStringsAsync(Stream stream, out IDisposable disposeMe)
{
var reader = new StreamReader(stream);
disposeMe = reader;
if (reader.EndOfStream)
{
throw new InvalidOperationException("Can't read an empty file");
}
return _usingDependency.ReadStringsAsync(reader);
}
然后在使用后处理:
IDisposable? disposeMe = null;
try
{
await foreach (var s in wrapper.ReadStringsAsync(stream, out disposeMe))
{
Console.WriteLine(s);
}
}
finally
{
disposeMe?.Dispose();
}
是的,但不是。
后者的替代方法是创建一个自定义的
IAsyncEnumerable/tor
实现来处理枚举数。
那么,就可读性、可维护性和性能而言,这些方法中哪种是最可行的?
选项 1 是最好的,IMO.
使用
async
/await
是自然的方法,如果代码过于急切地省略async
/
await
,早期处置问题是一个常见问题。问题是一样的;它只是应用于异步序列而不是类似任务。
这不会编译成一个无关的状态机和枚举器,它们除了代理实际工作和增加开销外什么都不做吗?
没有;它编译成一个状态机,该状态机跟踪枚举器状态并在枚举器被释放时释放您的资源。 ;) 换句话说,你让编译器为你编写“替代”选项。
如果这只是一个
await
,就不会显得奇怪了。怪异是因为foreach
+yield
。但这是一个平行的案例;同样的道理,foreach
+yield
是合适的。
其他语言(例如 Python)确实有一个
yield*
运算符,它本质上是 foreach
+yield
(及其异步对应物),但 C# 没有(对于异步代码和同步代码都没有)。
我会建议支持异步取消,并将读者置于
finally
:
using System.Runtime.CompilerServices;
public class UsingDependency
{
static async Task Main()
{
var source = new StringReader(@"some
data
here
we
won't
read
it
all");
var obj = new UsingDependency();
var cancel = new CancellationTokenSource();
int count = 0;
try
{
await foreach (var item in obj.ReadStringsAsync(source).WithCancellation(cancel.Token))
{
Console.WriteLine("got: " + item);
if (++count == 3)
{
cancel.Cancel();
// break; // alternative: *only* works cleanly if we exit
// *between* async read operations
}
}
}
catch (OperationCanceledException oce) when (oce.CancellationToken == cancel.Token)
{
Console.WriteLine("detected self-cancellation");
}
}
public IAsyncEnumerable<string> ReadStringsAsync(TextReader reader)
{
return Impl(reader, default);
static async IAsyncEnumerable<string> Impl(TextReader reader, [EnumeratorCancellation] CancellationToken cancellation)
{
try {
while (true) // or possibly while (!cancellation.IsCancellationRequested), which exits more cleanly
// *if* cancellation happens *between* async reads
{
var line = await reader.ReadLineAsync(cancellation);
if (line == null)
{
yield break;
}
yield return line;
}
}
// uncomment if you want to exit without throwing on cancellation during ReadLineAsync
//catch (OperationCanceledException oce) when (oce.CancellationToken == cancellation)
//{ }
finally
{
Console.WriteLine("We disposed the reader, yay!");
reader.Dispose();
}
}
}
}
要点是:
finally
的ReadStringsAsync
中放置,我们确保它位于适当的位置await foreach
between阅读,我们 不需要 需要取消;
finally
仍将被调用(通过枚举器上的 Dispose()
);您可以通过将cancel.Cancel();
替换为break;