如何使用ReadLineAsync方法从csv文件中读取最新行?

问题描述 投票:0回答:1

这是我使用 StreamReader 类中的 ReadLineAsync() 函数异步读取 csv 文件的代码,但它仅读取 csv 文件的第一行

    private async Task ReadAndSendJointDataFromCSVFileAsync(CancellationToken cancellationToken) {
     Stopwatch sw = new Stopwatch();
     sw.Start();

     string filePath = @ "/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/Data/Robots_data.csv";

     using(StreamReader oStreamReader = new StreamReader(File.OpenRead(filePath))) {
       string sFileLine = await oStreamReader.ReadLineAsync();

       string[] jointDataArray = sFileLine.Split(',');

       // Assuming the joint data is processed in parallel
       var tasks = new List < Task > ();

       // Process joint pose
       tasks.Add(Task.Run(async () => {
         var jointPose = jointDataArray.Take(7).Select(Convert.ToSingle).ToArray();
         var jointPoseJson = JsonSerializer.Serialize(jointPose);
         await SendTelemetryAsync("JointPose", jointPoseJson, cancellationToken);
       }));

       // Process joint velocity
       tasks.Add(Task.Run(async () => {
         var jointVelocity = jointDataArray.Skip(7).Take(7).Select(Convert.ToSingle).ToArray();
         var jointVelocityJson = JsonSerializer.Serialize(jointVelocity);
         await SendTelemetryAsync("JointVelocity", jointVelocityJson, cancellationToken);
       }));

       // Process joint acceleration
       tasks.Add(Task.Run(async () => {
         var jointAcceleration = jointDataArray.Skip(14).Take(7).Select(Convert.ToSingle).ToArray();
         var jointAccelerationJson = JsonSerializer.Serialize(jointAcceleration);
         await SendTelemetryAsync("JointAcceleration", jointAccelerationJson, cancellationToken);
       }));

       // Process external wrench
       tasks.Add(Task.Run(async () => {
         var externalWrench = jointDataArray.Skip(21).Take(6).Select(Convert.ToSingle).ToArray();
         var externalWrenchJson = JsonSerializer.Serialize(externalWrench);
         await SendTelemetryAsync("ExternalWrench", externalWrenchJson, cancellationToken);
       }));

       await Task.WhenAll(tasks);
     }

     sw.Stop();
     _logger.LogDebug(String.Format("Elapsed={0}", sw.Elapsed));
   }

基本上,csv 文件有 10128 行。我想读取添加到 csv 文件中的最新行。

我该怎么做?

使用 File.ReadLine(filePath) 会抛出此异常

未处理的异常。 System.IO.PathTooLongException:路径 '/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/-2.27625e-06,-0.78542,-3.79241e-06,-2.35622,5.66111e-06,3.14159 ,0.785408,0.00173646,-0.0015847,0.000962475,-0.00044469,-0.000247682,-0.000270337,0.000704195,0.000477503,0.000466693,-6.50 664e-05,0.00112044,-2.47425e-06,0.000445592,-0.000685786,1.21642,-0.853085,- 0.586162,-0.357496,-0.688677,0.230229' 太长,或者指定路径的某个组件太长。

private async Task ReadAndSendJointDataFromCSVFileAsync(CancellationToken cancellationToken) {
  Stopwatch sw = new Stopwatch();
  sw.Start();

  string filePath = @ "/home/adwait/azure-iot-sdk-csharp/iothub/device/samples/solutions/PnpDeviceSamples/Robot/Data/Robots_data.csv";

  using(StreamReader oStreamReader = new StreamReader(File.ReadLInes(filePath).Last())) {
    string sFileLine = await oStreamReader.ReadLineAsync();

    string[] jointDataArray = sFileLine.Split(',');

    // Assuming the joint data is processed in parallel
    var tasks = new List < Task > ();

    // Process joint pose
    tasks.Add(Task.Run(async () => {
      var jointPose = jointDataArray.Take(7).Select(Convert.ToSingle).ToArray();
      var jointPoseJson = JsonSerializer.Serialize(jointPose);
      await SendTelemetryAsync("JointPose", jointPoseJson, cancellationToken);
    }));

    // Process joint velocity
    tasks.Add(Task.Run(async () => {
      var jointVelocity = jointDataArray.Skip(7).Take(7).Select(Convert.ToSingle).ToArray();
      var jointVelocityJson = JsonSerializer.Serialize(jointVelocity);
      await SendTelemetryAsync("JointVelocity", jointVelocityJson, cancellationToken);
    }));

    // Process joint acceleration
    tasks.Add(Task.Run(async () => {
      var jointAcceleration = jointDataArray.Skip(14).Take(7).Select(Convert.ToSingle).ToArray();
      var jointAccelerationJson = JsonSerializer.Serialize(jointAcceleration);
      await SendTelemetryAsync("JointAcceleration", jointAccelerationJson, cancellationToken);
    }));

    // Process external wrench
    tasks.Add(Task.Run(async () => {
      var externalWrench = jointDataArray.Skip(21).Take(6).Select(Convert.ToSingle).ToArray();
      var externalWrenchJson = JsonSerializer.Serialize(externalWrench);
      await SendTelemetryAsync("ExternalWrench", externalWrenchJson, cancellationToken);
    }));

    await Task.WhenAll(tasks);
  }

  sw.Stop();
  _logger.LogDebug(String.Format("Elapsed={0}", sw.Elapsed));
}
c# csv
1个回答
1
投票

缓慢(但可靠)的方法是读取文件中的所有行并返回最后一行。比如:

static string? GetLastLine(string filename)
{
    using StreamReader reader = new(filename);
    string? last = null;
    while (reader.ReadLine() is string line)
        last = line;
    return last;
}

当然,这会无缘无故地分配大量字符串,GC 很快就会将它们全部删除。并不理想,但事情就是这样。 (

async
版本制作起来很简单,但是每 MB 的运行时间为几毫秒。由您决定。)

为了让它运行得更快,您可以尝试只读取文件的最后部分 - 足以获取几行 - 并处理它。您需要知道最大行长度,这样您才能保证至少获得一整行。假设您正在使用 ASCII 或 UTF8 数据,您可能可以执行以下操作:

static string? GetLastLine(string filename, int maxLineLength)
{
    using Stream stream = File.OpenRead(filename);
    stream.Position = stream.Length - maxLineLength * 3 / 2;
    using StreamReader reader = new(stream);
    string? last = null;
    while (reader.ReadLine() is string line)
        last = line;
    return last;
}

(无论文件大小如何,这应该在相当恒定的时间内工作,因为它总是处理相同数量的数据 - 在我的机器上使用

maxLineLength = 150
处理 1MB 文件大约需要 100 微秒。再次,转换为
async
版本很简单。)

最后,如果您希望随着时间的推移将行添加到文件中,并且只想从文件中读取新行,请跟踪文件大小并在文件大小发生变化时从那里恢复读取。


正如评论中所指出的,如果您想读取来自程序外部的新行,则上述内容并不是特别有用。无论发生什么变化,它不仅总是返回文件的最后一行,而且如果添加多行,您也只能得到最后一行。几乎肯定不是您想要的。

您可能想要的是一种在新行进来时读取它们的方法,而不是跳过任何新行。这意味着跟踪文件长度的变化并恢复一次读取一行。您不想将文件锁定太久,并且由于

StreamReader
前面有缓冲区,因此您不能依赖
stream.Position
与您刚刚读取的行末尾对齐。

解决方案是每次读取所有可以读取的行并保存以供后续读取。我们可以将新行拉入

Queue<>
并跟踪文件的大小以检测何时有更多内容需要读取。像这样的东西:

sealed class LineReader
{
    // File to read from.
    private readonly string _filename;
    
    // Queue for when multiple new lines are added between reads.
    private readonly Queue<string> _queue = new();
    
    // Length of file after last read.
    private long _lastPosition;
    
    // True if there's something to read.
    public bool LinesAvailable
    {
        get 
        {
            if (_queue.Count > 0)
                return true;
            FileInfo fi = new(_filename);
            return fi.Exists && fi.Length != _lastPosition;
        }
    }
    
    public LineReader(string filename, bool readExistingLines = false)
    {
        _filename = filename;
        FileInfo fi = new(_filename);
        _lastPosition = readExistingLines || !fi.Exists ? 0 : fi.Length;
    }
    
    // Returns when a line is read or token is cancelled.
    public async Task<string?> WaitNextLineAsync(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            if (_queue.TryDequeue(out string? queued))
                return queued;
            if (await GetNextLineAsync() is string line)
                return line;
            try
            {
                await Task.Delay(20, token);
            }
            catch (TaskCanceledException)
            {
                break;
            }
        }
        return null;
    }
    
    // Read next line, or `null` if none available.
    public async Task<string?> GetNextLineAsync()
    {
        if (_queue.TryDequeue(out string? line))
            return line;
        
        // Check if file length has changed.
        FileInfo fi = new(_filename);
        if (!fi.Exists || fi.Length == _lastPosition)
            return null;
        
        // Open the stream. May fail if file is locked.
        Stream stream;
        try
        {
            stream = fi.OpenRead();
        }
        catch 
        {
            return null;
        }
        
        using (stream)
        {
            // If file is smaller assume it was cleared, read all lines.
            // Otherwise, go to our last position.
            if (stream.Length >= _lastPosition)
                stream.Position = _lastPosition;
            
            using StreamReader reader = new(stream);
            
            // Queue up new lines
            while (await reader.ReadLineAsync() is string next)
            {
                // Ignore empty lines.
                if (!string.IsNullOrEmpty(next))
                    _queue.Enqueue(next);
            }
        }
            
        // Update state and return the last read line if not empty.
        _lastPosition = stream.Length;
        _queue.TryDequeue(out line);
        return line;
    }
}

GetNextLineAsync()
尝试检索下一个可用行,
WaitNextLineAsync(...)
异步旋转,直到读取一行。取消退出而不是抛出,如果您想使用
CancelAfter
令牌提供超时,这会很方便:

async Task ProcessNextLineAsync(LineReader reader, CancellationToken token)
{
    CancellationTokenSource source = CancellationTokenSource.CreateLinkedTokenSource(token);
    source.CancelAfter(TimeSpan.FromSeconds(1));

    if (await LineReader.WaitNextLineAsync(source.Token) is string line)
    {
        // do something with the line
    }
}

(是的,我可以做一个更简单的版本。不过你要求

async
。)

注意:这是不是线程安全的。

© www.soinside.com 2019 - 2024. All rights reserved.