Observable.Using 和异步流获取损坏的数据

问题描述 投票:0回答:2

我有一个流,其目标是计算一组 .zip 文件中内容的简单“校验和”

为此,我设置了一个可观察的:

  1. 获取给定文件夹中的所有文件
  2. 读取每个文件的内容(读取为
    ZipArchive
  3. 对于每个文件中的每个条目,执行校验和
  4. 的计算

为了说明这一点,我创建了这个示例:

注意使用

AsyncContext.Run
(https://stackoverflow.com/a/9212343/1025407) 使
Main
方法等待
GetChecksum
,因为它是控制台应用程序

namespace DisposePoC
{
    using System.Collections.Generic;
    using System.IO;
    using System.IO.Compression;
    using System.Reactive.Linq;
    using Nito.AsyncEx;
    using System.Linq;
    using System.Threading.Tasks;


    class Program
    {
        private static void Main()
        {
            AsyncContext.Run(GetChecksums);
        }

        private static async Task<IList<byte>> GetChecksums()
        {
            var bytes = Directory.EnumerateFiles("FolderWithZips")
                .ToObservable()
                .SelectMany(path => Observable.Using(() => CreateZipArchive(path), archive => archive.Entries.ToObservable()))
                .SelectMany(entry => Observable.Using(entry.Open, stream => Observable.FromAsync(() => CalculateChecksum(stream, entry.Length))));

            return await bytes.ToList();
        }

        private static ZipArchive CreateZipArchive(string path)
        {
            return new ZipArchive(new FileStream(path, FileMode.Open, FileAccess.Read));
        }

        private static async Task<byte> CalculateChecksum(Stream stream, long entryLength)
        {
            var bytes = await GetBytesFromStream(stream, entryLength);
            return bytes.Aggregate((b1, b2) => (byte) (b1 ^ b2));
        }

        private static async Task<byte[]> GetBytesFromStream(Stream stream, long entryLength)
        {
            byte[] bytes = new byte[entryLength];
            await stream.ReadAsync(bytes, 0, (int)entryLength);
            return bytes;            
        }
    }
}

运行应用程序,我收到各种错误:

“System.IO.InvalidDataException”:本地文件头已损坏。 “System.NotSupportedException”:流不支持读取。 “System.ObjectDisposeException”:无法访问已处置的对象。 'System.IO.InvalidDataException':块长度与其补码不匹配。

我做错了什么?

observable 本身有问题还是因为

ZipArchive
不是线程安全的?如果不是,我该如何使代码工作?

c# .net reactive-programming system.reactive c#-ziparchive
2个回答
2
投票

Rx 可能不是最适合这个的。老实说,你甚至可以在没有异步的情况下做到这一点。

Directory.EnumerateFiles("FolderWithZips")
         .AsParallel()
         .Select(folder => CalculateChecksum(folder))
         .ToList()

1
投票

您的问题似乎没有任何“Rx”。

如果你将整个事情修改为一组命令式循环,它就可以正常工作

private static async Task<IList<byte>> GetChecksums()
{
    var bytes = new List<byte>();
    foreach (var path in Directory.EnumerateFiles("FolderWithZips"))
    {
        using (var archive = CreateZipArchive(path))
        {
            foreach (var entry in archive.Entries)
            {
                using (var stream = entry.Open())
                {
                    var checksum = await CalculateChecksum(stream, entry.Length);
                    bytes.Add(checksum);
                }
            }
        }
    }

    return bytes;
}

所以我想你有一组竞争条件(并发)和/或无序处理问题。

© www.soinside.com 2019 - 2024. All rights reserved.