枚举后,在非异步 IAsyncEnumerable<T> 中实例化一个一次性的

问题描述 投票:0回答:2

我想通过

IAsyncEnumerable<string>
公开从文件中读取的数据,由第三方库完成,因此我的调用代码可以在数据传入时对其进行处理。

问题是我必须将一次性(这里以

MemoryStream
为模型)传递给第三方库,并且第三方库的主类本身也必须被处理(参见
StreamReader
)。这是一个人为的例子,现实生活中确实需要双重处置。

var dependency = new UsingDependency();
var wrapper = new Wrapper(dependency);

// My calling code, fake reading a file and then process it:
using var stream = new MemoryStream("Multi\nLine\nString"u8.ToArray());

await foreach (var s in wrapper.ReadStringsAsync(stream))
{
    Console.WriteLine(s);
}

public class Wrapper
{
    private readonly UsingDependency _usingDependency;

    public Wrapper(UsingDependency usingDependency) { _usingDependency = UsingDependency; }

    // The "wrapper" method I have, to hide and dispose the StreamReader:
    public IAsyncEnumerable<string> ReadStringsAsync(Stream stream)
    {
        using var reader = new StreamReader(stream);

        if (reader.EndOfStream)
        {
            throw new InvalidOperationException("Can't read an empty file");
        }

        return _usingDependency.ReadStringsAsync(reader);
    }
}

Wrapper.ReadStringsAsync()
的全部意义在于向调用者隐藏
StreamReader
正在被使用,但出于所有意图和目的,它并不是真正的
StreamReader
并且必须在使用后处理它。

实际的依赖是这样的:

public class UsingDependency
{
    public async IAsyncEnumerable<string> ReadStringsAsync(StreamReader reader)
    {
        while (true)
        {
            var line = await reader.ReadLineAsync();
            if (line == null)
            {
                yield break;
            }

            yield return line;
        }
    }
}

我不能让我的代码在处理所有数据之前处理读者。不过,这段代码的作用是在读取第一行之前就处理 StreamReader:

System.ObjectDisposedException:无法从关闭的 TextReader 中读取。

如何重构非异步

IAsyncEnumerable<string> Wrapper.ReadStringsAsync()
方法,以便:

  1. 可以使用
    await foreach()
    调用。
  2. 它隐藏了调用者对
    StreamReader
    的使用。
  3. 它仍然拥有
    StreamReader
    ,因为它需要调用它的方法before返回
    IAsyncEnumerable
    .
  4. 它不会被异步化,因为它不能
    yield return
    .
  5. 它只处理
    StreamReader
    它的所有内容都被枚举之后。

在写这个问题的时候,我找到了两个解决方案:

选项 1,关于上面的第 4 点从异步方法返回 IAsyncEnumerable

确实让它异步,让它异步枚举结果并再次返回它们:

public async IAsyncEnumerable<string> ReadStringsAsync(Stream stream)
{
    using var reader = new StreamReader(stream);

    if (reader.EndOfStream)
    {
        throw new InvalidOperationException("Can't read an empty file");
    }

    await foreach (var s in _usingDependency.ReadStringsAsync(reader))
    {
        yield return s;
    }
}

但这感觉就像异步。这不会编译成一个无关的状态机和枚举器,它们除了代理实际工作和增加开销外什么都不做吗?


选项 2,关于上述第 5 点IAsyncEnumerable 方法中的正确处置?

通过使非异步返回一次性变量

out
变量:

public IAsyncEnumerable<string> ReadStringsAsync(Stream stream, out IDisposable disposeMe)
{
    var reader = new StreamReader(stream);

    disposeMe = reader;

    if (reader.EndOfStream)
    {
        throw new InvalidOperationException("Can't read an empty file");
    }

    return _usingDependency.ReadStringsAsync(reader);
}

然后在使用后处理:

IDisposable? disposeMe = null;

try
{
    await foreach (var s in wrapper.ReadStringsAsync(stream, out disposeMe))
    {
        Console.WriteLine(s);
    }
}
finally
{
    disposeMe?.Dispose();
}

是的,但不是。

后者的替代方法是创建一个自定义的

IAsyncEnumerable/tor
实现来处理枚举数。

那么,就可读性、可维护性和性能而言,这些方法中哪种是最可行的?

c# dispose iasyncenumerable
2个回答
1
投票

选项 1 是最好的,IMO.

使用

async
/
await
是自然的方法,如果代码过于急切地省略async
/
await
早期处置问题是一个常见问题
。问题是一样的;它只是应用于异步序列而不是类似任务。

这不会编译成一个无关的状态机和枚举器,它们除了代理实际工作和增加开销外什么都不做吗?

没有;它编译成一个状态机,该状态机跟踪枚举器状态并在枚举器被释放时释放您的资源。 ;) 换句话说,你让编译器为你编写“替代”选项。

如果这只是一个

await
,就不会显得奇怪了。怪异是因为
foreach
+
yield
。但这是一个平行的案例;同样的道理,
foreach
+
yield
是合适的。

其他语言(例如 Python)确实有一个

yield*
运算符,它本质上是
foreach
+
yield
(及其异步对应物),但 C# 没有(对于异步代码和同步代码都没有)。


0
投票

我会建议支持异步取消,并将读者置于

finally

using System.Runtime.CompilerServices;

public class UsingDependency
{
    static async Task Main()
    {
        var source = new StringReader(@"some
data
here
we
won't
read
it
all");

        var obj = new UsingDependency();
        var cancel = new CancellationTokenSource();
        int count = 0;
        try
        {
            await foreach (var item in obj.ReadStringsAsync(source).WithCancellation(cancel.Token))
            {
                Console.WriteLine("got: " + item);
                if (++count == 3)
                {
                    cancel.Cancel();
                    // break; // alternative: *only* works cleanly if we exit
                              // *between* async read operations
                }
            }
        }
        catch (OperationCanceledException oce) when (oce.CancellationToken == cancel.Token)
        {
            Console.WriteLine("detected self-cancellation");
        }
    }
    public IAsyncEnumerable<string> ReadStringsAsync(TextReader reader)
    {
        return Impl(reader, default);
        static async IAsyncEnumerable<string> Impl(TextReader reader, [EnumeratorCancellation] CancellationToken cancellation)
        {
            try {
                while (true) // or possibly while (!cancellation.IsCancellationRequested), which exits more cleanly
                // *if* cancellation happens *between* async reads
                {
                    var line = await reader.ReadLineAsync(cancellation);
                    if (line == null)
                    {
                        yield break;
                    }

                    yield return line;
                }
            }
            // uncomment if you want to exit without throwing on cancellation during ReadLineAsync
            //catch (OperationCanceledException oce) when (oce.CancellationToken == cancellation)
            //{ }  
            finally
            {
                Console.WriteLine("We disposed the reader, yay!");
                reader.Dispose();
            }
        }
    }
}

要点是:

  • 通过在
    finally
    ReadStringsAsync
    中放置,我们确保它位于适当的位置
  • 通过支持取消,我们允许枚举器的选择性短路包括异步操作正在进行时
  • 请注意,如果您刚刚退出 await foreach
    between
    阅读,我们 不需要 需要取消;
    finally
    仍将被调用(通过枚举器上的
    Dispose()
    );您可以通过将
    cancel.Cancel();
    替换为
    break;
  • 来看到这一点
© www.soinside.com 2019 - 2024. All rights reserved.