多线程环境中的 HttpClient SendAsync 与 ArchiveZip

问题描述 投票:0回答:1

下面是原始问题,但解决方案与使用 ZipArchive 类将多个流获取到多个文件并在多个线程中使用它们有关。真正的问题及其答案 //------------

我有一个相当复杂的案例...我正在从其他 API 调用我自己的 API。我一次这样做 6 次。在第一次调用期间,我需要获取访问令牌并将其缓存,以便所有其他请求都可以使用它。如果我首先同步调用,然后立即异步休息 5 个 - 一切正常。但同时出现所有 6 个 - 我收到来自 SendAsync 的错误(访问令牌正确生成):

The archive entry was compressed using an unsupported compression method.

如果我从代码中删除 SemaphoreSlim,并且所有 6 个调用都将自行生成令牌(因为所有调用都无法从缓存中获取它),那么它也可以工作...... 令牌的代码(此类是为每个请求单独创建的,这就是信号量是静态的原因):

public class AzureCredentialCached : ChainedTokenCredential
{
    private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);

    public override async ValueTask<AccessToken> GetTokenAsync(TokenRequestContext requestContext, CancellationToken cancellationToken = default)
    {
        AccessToken? cachedToken = _cacheService.TryGetToken(requestContext, _uri);
        if (cachedToken is null)
        {
            await _semaphore.WaitAsync(cancellationToken);
            try
            {
                cachedToken = _cacheService.TryGetToken(requestContext, _uri);
                if (cachedToken is null)
                {
                    cachedToken = await base.GetTokenAsync(requestContext, cancellationToken);
                    _cacheService.InsertNewToken((AccessToken)cachedToken, requestContext, _uri);
                }
            }
            finally
            {
                _semaphore.Release();
            }
        }

        return (AccessToken)cachedToken;
    }
}

委托处理者:

public class StorageApiAuthorizeRequestHandler : DelegatingHandler
{
    private readonly IAzureCredentialCacheService _credentialCacheService;
    private readonly GeneralSection _generalConfig;
    private readonly StorageApiSection _storageConfig;

    public StorageApiAuthorizeRequestHandler(IAzureCredentialCacheService credentialCacheService, IOptions<StorageApiSection> storageConfig, IOptions<GeneralSection> generalConfig)
    {
        _credentialCacheService = credentialCacheService;
        _generalConfig = generalConfig.Value;
        _storageConfig = storageConfig.Value;
    }

    protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
    {
        var credential = new AzureCredentialCached(_credentialCacheService, new Uri(_storageConfig.Address, UriKind.Absolute), new Azure.Core.TokenCredential[]
        {
            new ManagedIdentityCredential(_generalConfig.ManagedIdentityClientId),
            new VisualStudioCredential()
        });

        var scopes = new[] { _storageConfig.Scope };
        var accessToken = (await credential.GetTokenAsync(new Azure.Core.TokenRequestContext(scopes))).Token;

        request.Headers.Add(HeaderNames.Authorization, $"Bearer {accessToken}");

        try
        {
            return await base.SendAsync(request, cancellationToken);
        }
        catch (Exception ex)
        {
            throw;
        }
    }
}

The archive entry was compressed using an unsupported compression method.
 at System.IO.Compression.Inflater.Inflate(FlushCode flushCode)
   at System.IO.Compression.Inflater.ReadInflateOutput(Byte* bufPtr, Int32 length, FlushCode flushCode, Int32& bytesRead)
   at System.IO.Compression.Inflater.ReadOutput(Byte* bufPtr, Int32 length, Int32& bytesRead)
   at System.IO.Compression.Inflater.InflateVerified(Byte* bufPtr, Int32 length)
   at System.IO.Compression.DeflateStream.CopyToStream.<WriteAsyncCore>d__10.MoveNext()
   at System.IO.Stream.<<CopyToAsync>g__Core|29_0>d.MoveNext()
   at System.IO.Compression.DeflateStream.CopyToStream.<CopyFromSourceToDestinationAsync>d__6.MoveNext()
   at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable.ConfiguredValueTaskAwaiter.GetResult()
   at System.Net.Http.HttpContent.<<CopyToAsync>g__WaitAsync|56_0>d.MoveNext()
   at System.Net.Http.MultipartContent.<SerializeToStreamAsyncCore>d__23.MoveNext()
   at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable.ConfiguredValueTaskAwaiter.GetResult()
   at System.Net.Http.HttpContent.<<CopyToAsync>g__WaitAsync|56_0>d.MoveNext()
   at System.Net.Http.HttpConnection.<SendRequestContentAsync>d__70.MoveNext()
   at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable.ConfiguredValueTaskAwaiter.GetResult()
   at System.Net.Http.HttpConnection.<SendAsyncCore>d__64.MoveNext()
   at System.Net.Http.HttpConnection.<SendAsyncCore>d__64.MoveNext()
   at System.Net.Http.HttpConnectionPool.<SendWithVersionDetectionAndRetryAsync>d__84.MoveNext()
   at System.Net.Http.DiagnosticsHandler.<SendAsyncCore>d__8.MoveNext()
   at System.Threading.Tasks.ValueTask`1.get_Result()
   at System.Runtime.CompilerServices.ConfiguredValueTaskAwaitable`1.ConfiguredValueTaskAwaiter.GetResult()
   at System.Net.Http.RedirectHandler.<SendAsync>d__4.MoveNext()
   at Microsoft.Extensions.Http.Logging.LoggingHttpMessageHandler.<SendAsync>d__5.MoveNext()
   at MyAPP.ApiAuthentication.StorageApiAuthorizeRequestHandler.<SendAsync>d__4.MoveNext() in C:\MyAPP\ApiAuthentication\StorageApiAuthorizeRequestHandler.cs:line 37
c# httpclient c#-ziparchive
1个回答
0
投票

这个问题实际上与我发送的内容有关。我们使用 ZipArchive 类从 ZIP 存档中提取文件并将它们上传到我们的另一个 API。所以流程是这样的:

  1. 我正在打开 ZIP 格式文件的流

    var 条目 = _zipArchive.GetEntry(path); 入口。打开();

  2. 我正在请求中使用此流进行 SendAsync

  3. 打开另一个全新的流到另一个文件

  4. 我正在请求中使用此流执行 SendAsync ...

  5. 与此同时,第二个 API 启动并加载配置。它收到多个请求,但由于信号量,每个请求都在等待第一个请求从 Azure 加载 APP 配置

  6. 完成后,它会尝试同时从第一个 API 拉取流

  7. 看起来甚至来自 ZipArchive 库的不同流也不能同时使用。 SendAsync 抛出异常,因为我认为这些流是动态解压缩的并且以某种方式彼此相交。

解决方案是将所有这些流复制到单独的 MemoryStreams(并且由于它们首先被一个接一个解压缩)。现在所有 SendAsyncs 同时工作,没有任何问题......

© www.soinside.com 2019 - 2024. All rights reserved.