如何使非阻塞 ReadLineAsync() 在 .NET 中并发运行?

问题描述 投票:0回答:1

我们如何使用非阻塞 IO 并发读取文件行?

async await 的直接使用,如

BiggestLineSequential
中所示,导致顺序执行:

static async Task<string> BiggestLineSequential(String filename) {
    string longestLine = "";
    using (StreamReader reader = new StreamReader(filename)) {
            while (!reader.EndOfStream)
            {
                var line = await reader.ReadLineAsync();
                if (line != null && line.Length > longestLine.Length)
                {
                    longestLine = line;
                }
            }
        }
        return longestLine;
}

在这种情况下,仅当

await reader.ReadLineAsync();
返回的
Task
完成时,对
ReadLineAsync()
的调用才会继续。因此,只有当上一行已经从文件中读取时,才会读取下一行。

为了实现并发行读取,我尝试了以下方法:

static string BiggestLineConcurrent(String filename) {
      List<Task<string>> taskList = new List<Task<string>>();
      using (StreamReader reader = new StreamReader(filename))
      {
            while (!reader.EndOfStream)
            {
                  taskList.Add(reader.ReadLineAsync());
            }
      }

      string longestLine = "";
      foreach(Task<string> tsk in taskList)
      {
            string line = tsk.Result;
            if (line != null && line.Length > longestLine.Length)
            {
                  longestLine = line;
            }
      }

      return longestLine;
}

在这种情况下,所有非阻塞 IO 操作都应该同时运行,而不是像第一个示例中所示的顺序运行。所有任务都启动并存储在列表中,然后在结果可用时继续处理。

但是,

BiggestLineConcurrent
会在
InvalidOperationException
行中抛出消息“The stream is currently in use by a previous operation on the stream.”。
.NET 中是否有任何方法可以使用非阻塞 IO 并发读取行,而无需显式使用新线程或辅助线程池机制?

c# .net nonblocking
1个回答
0
投票

相反,您可以使用多个流从多个地方开始阅读。

while (!reader.EndOfStream)

当流到达分区的终点时,您需要关闭流,然后因为
static string BiggestLineConcurrent(String filename) { var maxDOP = Environment.ProcessorCount; var fileLength = new FileInfo(filename).Length var tasks = Partitioner.Create(0, fileLength - 1) .GetPartitions(fileLength / maxDOP + 1) .Select(t => ReadData(filename, t.Item1, t.Item2)); var longests = await Task.WhenAll(tasks); return longests.MaxBy(l => l.Length); async Task<string> ReadData(string filename2, long from, long to) { var longest = ""; using var stream = new FileStream(filename2, FileMode.Open, FileAccess.Read, FileShare.Read); stream.Position = from; using var reader = new StreamReader(stream); string line; try {          while ((line = await reader.ReadLineAsync()) != null)           {            if (longest.Length < line.Length) longest = line; if (stream.Position > to) stream.Close(); }          } catch (ObjectDisposedException) { \\ } return line;      } }

可能会抛出,因此您需要吞下该异常。

您的另一种选择是使用某种

StreamReader

.

© www.soinside.com 2019 - 2024. All rights reserved.