这是我使用 StreamReader 类中的 ReadLineAsync() 函数异步读取 csv 文件的代码,但它仅读取 csv 文件的第一行
private async Task ReadAndSendJointDataFromCSVFileAsync(CancellationToken cancellationToken) {
Stopwatch sw = new Stopwatch();
sw.Start();
string filePath = @ "/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/Data/Robots_data.csv";
using(StreamReader oStreamReader = new StreamReader(File.OpenRead(filePath))) {
string sFileLine = await oStreamReader.ReadLineAsync();
string[] jointDataArray = sFileLine.Split(',');
// Assuming the joint data is processed in parallel
var tasks = new List < Task > ();
// Process joint pose
tasks.Add(Task.Run(async () => {
var jointPose = jointDataArray.Take(7).Select(Convert.ToSingle).ToArray();
var jointPoseJson = JsonSerializer.Serialize(jointPose);
await SendTelemetryAsync("JointPose", jointPoseJson, cancellationToken);
}));
// Process joint velocity
tasks.Add(Task.Run(async () => {
var jointVelocity = jointDataArray.Skip(7).Take(7).Select(Convert.ToSingle).ToArray();
var jointVelocityJson = JsonSerializer.Serialize(jointVelocity);
await SendTelemetryAsync("JointVelocity", jointVelocityJson, cancellationToken);
}));
// Process joint acceleration
tasks.Add(Task.Run(async () => {
var jointAcceleration = jointDataArray.Skip(14).Take(7).Select(Convert.ToSingle).ToArray();
var jointAccelerationJson = JsonSerializer.Serialize(jointAcceleration);
await SendTelemetryAsync("JointAcceleration", jointAccelerationJson, cancellationToken);
}));
// Process external wrench
tasks.Add(Task.Run(async () => {
var externalWrench = jointDataArray.Skip(21).Take(6).Select(Convert.ToSingle).ToArray();
var externalWrenchJson = JsonSerializer.Serialize(externalWrench);
await SendTelemetryAsync("ExternalWrench", externalWrenchJson, cancellationToken);
}));
await Task.WhenAll(tasks);
}
sw.Stop();
_logger.LogDebug(String.Format("Elapsed={0}", sw.Elapsed));
}
基本上,csv 文件有 10128 行。我想读取添加到 csv 文件中的最新行。
我该怎么做?
使用 File.ReadLine(filePath) 会抛出此异常
未处理的异常。 System.IO.PathTooLongException:路径 '/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/-2.27625e-06,-0.78542,-3.79241e-06,-2.35622,5.66111e-06,3.14159 ,0.785408,0.00173646,-0.0015847,0.000962475,-0.00044469,-0.000247682,-0.000270337,0.000704195,0.000477503,0.000466693,-6.50 664e-05,0.00112044,-2.47425e-06,0.000445592,-0.000685786,1.21642,-0.853085,- 0.586162,-0.357496,-0.688677,0.230229' 太长,或者指定路径的某个组件太长。
private async Task ReadAndSendJointDataFromCSVFileAsync(CancellationToken cancellationToken) {
Stopwatch sw = new Stopwatch();
sw.Start();
string filePath = @ "/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/Data/Robots_data.csv";
using(StreamReader oStreamReader = new StreamReader(File.ReadLInes(filePath).Last())) {
string sFileLine = await oStreamReader.ReadLineAsync();
string[] jointDataArray = sFileLine.Split(',');
// Assuming the joint data is processed in parallel
var tasks = new List < Task > ();
// Process joint pose
tasks.Add(Task.Run(async () => {
var jointPose = jointDataArray.Take(7).Select(Convert.ToSingle).ToArray();
var jointPoseJson = JsonSerializer.Serialize(jointPose);
await SendTelemetryAsync("JointPose", jointPoseJson, cancellationToken);
}));
// Process joint velocity
tasks.Add(Task.Run(async () => {
var jointVelocity = jointDataArray.Skip(7).Take(7).Select(Convert.ToSingle).ToArray();
var jointVelocityJson = JsonSerializer.Serialize(jointVelocity);
await SendTelemetryAsync("JointVelocity", jointVelocityJson, cancellationToken);
}));
// Process joint acceleration
tasks.Add(Task.Run(async () => {
var jointAcceleration = jointDataArray.Skip(14).Take(7).Select(Convert.ToSingle).ToArray();
var jointAccelerationJson = JsonSerializer.Serialize(jointAcceleration);
await SendTelemetryAsync("JointAcceleration", jointAccelerationJson, cancellationToken);
}));
// Process external wrench
tasks.Add(Task.Run(async () => {
var externalWrench = jointDataArray.Skip(21).Take(6).Select(Convert.ToSingle).ToArray();
var externalWrenchJson = JsonSerializer.Serialize(externalWrench);
await SendTelemetryAsync("ExternalWrench", externalWrenchJson, cancellationToken);
}));
await Task.WhenAll(tasks);
}
sw.Stop();
_logger.LogDebug(String.Format("Elapsed={0}", sw.Elapsed));
}
缓慢(但可靠)的方法是读取文件中的所有行并返回最后一行。比如:
static string? GetLastLine(string filename)
{
using StreamReader reader = new(filename);
string? last = null;
while (reader.ReadLine() is string line)
last = line;
return last;
}
当然,这会无缘无故地分配大量字符串,GC 很快就会将它们全部删除。并不理想,但事情就是这样。 (
async
版本制作起来很简单,但是每 MB 的运行时间为几毫秒。由您决定。)
为了让它运行得更快,您可以尝试只读取文件的最后部分 - 足以获取几行 - 并处理它。您需要知道最大行长度,这样您才能保证至少获得一整行。假设您正在使用 ASCII 或 UTF8 数据,您可能可以执行以下操作:
static string? GetLastLine(string filename, int maxLineLength)
{
using Stream stream = File.OpenRead(filename);
stream.Position = stream.Length - maxLineLength * 3 / 2;
using StreamReader reader = new(stream);
string? last = null;
while (reader.ReadLine() is string line)
last = line;
return last;
}
(无论文件大小如何,这应该在相当恒定的时间内工作,因为它总是处理相同数量的数据 - 在我的机器上使用
maxLineLength = 150
处理 1MB 文件大约需要 100 微秒。再次,转换为 async
版本很简单。)
最后,如果您希望随着时间的推移将行添加到文件中,并且只想从文件中读取新行,请跟踪文件大小并在文件大小发生变化时从那里恢复读取。
正如评论中所指出的,如果您想读取来自程序外部的新行,则上述内容并不是特别有用。无论发生什么变化,它不仅总是返回文件的最后一行,而且如果添加多行,您也只能得到最后一行。几乎肯定不是您想要的。
您可能想要的是一种在新行进来时读取它们的方法,而不是跳过任何新行。这意味着跟踪文件长度的变化并恢复一次读取一行。您不想将文件锁定太久,并且由于
StreamReader
前面有缓冲区,因此您不能依赖 stream.Position
与您刚刚读取的行末尾对齐。
解决方案是每次读取所有可以读取的行并保存以供后续读取。我们可以将新行拉入
Queue<>
并跟踪文件的大小以检测何时有更多内容需要读取。像这样的东西:
sealed class LineReader
{
// File to read from.
private readonly string _filename;
// Queue for when multiple new lines are added between reads.
private readonly Queue<string> _queue = new();
// Length of file after last read.
private long _lastPosition;
// True if there's something to read.
public bool LinesAvailable
{
get
{
if (_queue.Count > 0)
return true;
FileInfo fi = new(_filename);
return fi.Exists && fi.Length != _lastPosition;
}
}
public LineReader(string filename, bool readExistingLines = false)
{
_filename = filename;
FileInfo fi = new(_filename);
_lastPosition = readExistingLines || !fi.Exists ? 0 : fi.Length;
}
// Returns when a line is read or token is cancelled.
public async Task<string?> WaitNextLineAsync(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
if (_queue.TryDequeue(out string? queued))
return queued;
if (await GetNextLineAsync() is string line)
return line;
try
{
await Task.Delay(20, token);
}
catch (TaskCanceledException)
{
break;
}
}
return null;
}
// Read next line, or `null` if none available.
public async Task<string?> GetNextLineAsync()
{
if (_queue.TryDequeue(out string? line))
return line;
// Check if file length has changed.
FileInfo fi = new(_filename);
if (!fi.Exists || fi.Length == _lastPosition)
return null;
// Open the stream. May fail if file is locked.
Stream stream;
try
{
stream = fi.OpenRead();
}
catch
{
return null;
}
using (stream)
{
// If file is smaller assume it was cleared, read all lines.
// Otherwise, go to our last position.
if (stream.Length >= _lastPosition)
stream.Position = _lastPosition;
using StreamReader reader = new(stream);
// Queue up new lines
while (await reader.ReadLineAsync() is string next)
{
// Ignore empty lines.
if (!string.IsNullOrEmpty(next))
_queue.Enqueue(next);
}
}
// Update state and return the last read line if not empty.
_lastPosition = stream.Length;
_queue.TryDequeue(out line);
return line;
}
}
GetNextLineAsync()
尝试检索下一个可用行,WaitNextLineAsync(...)
异步旋转,直到读取一行。取消退出而不是抛出,如果您想使用 CancelAfter
令牌提供超时,这会很方便:
async Task ProcessNextLineAsync(LineReader reader, CancellationToken token)
{
CancellationTokenSource source = CancellationTokenSource.CreateLinkedTokenSource(token);
source.CancelAfter(TimeSpan.FromSeconds(1));
if (await LineReader.WaitNextLineAsync(source.Token) is string line)
{
// do something with the line
}
}
(是的,我可以做一个更简单的版本。不过你要求
async
。)
注意:这是不是线程安全的。