我们如何使用非阻塞 IO 并发读取文件行?
async await 的直接使用,如
BiggestLineSequential
中所示,导致顺序执行:
static async Task<string> BiggestLineSequential(String filename) {
string longestLine = "";
using (StreamReader reader = new StreamReader(filename)) {
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
if (line != null && line.Length > longestLine.Length)
{
longestLine = line;
}
}
}
return longestLine;
}
在这种情况下,仅当
await reader.ReadLineAsync();
返回的 Task
完成时,对 ReadLineAsync()
的调用才会继续。因此,只有当上一行已经从文件中读取时,才会读取下一行。
为了实现并发行读取,我尝试了以下方法:
static string BiggestLineConcurrent(String filename) {
List<Task<string>> taskList = new List<Task<string>>();
using (StreamReader reader = new StreamReader(filename))
{
while (!reader.EndOfStream)
{
taskList.Add(reader.ReadLineAsync());
}
}
string longestLine = "";
foreach(Task<string> tsk in taskList)
{
string line = tsk.Result;
if (line != null && line.Length > longestLine.Length)
{
longestLine = line;
}
}
return longestLine;
}
在这种情况下,所有非阻塞 IO 操作都应该同时运行,而不是像第一个示例中所示的顺序运行。所有任务都启动并存储在列表中,然后在结果可用时继续处理。
但是,
BiggestLineConcurrent
会在 InvalidOperationException
行中抛出消息“The stream is currently in use by a previous operation on the stream.”。.NET 中是否有任何方法可以使用非阻塞 IO 并发读取行,而无需显式使用新线程或辅助线程池机制?
相反,您可以使用多个流从多个地方开始阅读。
while (!reader.EndOfStream)
当流到达分区的终点时,您需要关闭流,然后因为
static string BiggestLineConcurrent(String filename)
{
var maxDOP = Environment.ProcessorCount;
var fileLength = new FileInfo(filename).Length
var tasks = Partitioner.Create(0, fileLength - 1)
.GetPartitions(fileLength / maxDOP + 1)
.Select(t => ReadData(filename, t.Item1, t.Item2));
var longests = await Task.WhenAll(tasks);
return longests.MaxBy(l => l.Length);
async Task<string> ReadData(string filename2, long from, long to)
{
var longest = "";
using var stream = new FileStream(filename2, FileMode.Open, FileAccess.Read, FileShare.Read);
stream.Position = from;
using var reader = new StreamReader(stream);
string line;
try
{
while ((line = await reader.ReadLineAsync()) != null)
{
if (longest.Length < line.Length)
longest = line;
if (stream.Position > to)
stream.Close();
}
}
catch (ObjectDisposedException)
{ \\
}
return line;
}
}
可能会抛出,因此您需要吞下该异常。
您的另一种选择是使用某种.