使用数百万个任务会导致错误 System.OutOfMemoryException (C#)

问题描述 投票:0回答:1

我有 600 万个小文件(平均大小约为 15 字节),需要使用处理器读取并进一步处理。我之前已经使用 Task.Factory 实现了此功能,并且它在 asp.net core 2.1 上运行没有问题。花了大约20个小时。

我现在已将应用程序迁移到 asp.net 6,并且在测试服务器上,我的 Web 应用程序在启动这些文件操作后停止响应任何请求。在日志中我看到错误

System.OutOfMemoryException

我想我的实现方式还很不理想。我想知道您可以建议哪些多线程实现这项工作的方法?

来自控制器的

方法

ImportSignatures

[HttpPost("ImportSignatures")]
public JsonResult ImportSignatures()
{
    try
    {
        return Json(SignatureImportService.ImportSigningCerts());
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
        return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
    }
}

方法

ImportSigningCerts

public static ImportSigningCertsResult ImportSigningCerts()
{
    LogsHelper.WriteEventLog("Launching SignatureImportService");
    WasCancelled = false;
    IsWorking = true;
    ResultStr = "";
    totalSignatures = 0;
    processedSignatures = 0;

    var cancelMsg = "Certificate import was interrupted. \n";
    var endMsg = "Certificate import completed successfully. \n";
    var toDelete = new List<string>();

    try
    {
        var configuration = SignatureImportConfiguration.FromCfg();

        using (s_tokenSource = new CancellationTokenSource())
        {
            List<string> signatures = Directory.EnumerateFiles(configuration.Path, "*.sig").ToList();
            totalSignatures = signatures.Count;

            Store mainStore = StoreMan.GetStore("Main");
            var importStats = new ImportStats();
            var tasks = new List<Task>();

            int saveIndex = 1;
            const int proccessedForSave = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures
            CancellationToken token = s_tokenSource.Token;

            int minWorkerThreads, minCompletionPortThreads, maxWorkerThreads, maxCompletionPortThreads;
            ThreadPool.GetMinThreads(out minWorkerThreads, out minCompletionPortThreads);
            ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);
            ThreadPool.SetMaxThreads(minWorkerThreads * 2, maxCompletionPortThreads);

            signatures.ForEach(path =>
            {
                tasks.Add(Task.Factory.StartNew(() =>
                {
                    token.ThrowIfCancellationRequested();

                    // Here reading of a current file and uploading the necessary certificates to the store from it
                    if (UploadSigningCerts(mainStore, path, importStats))
                    {
                        if (configuration.NeedCleaning)
                        {
                            lock (s_toDeleteListLockObj)
                                toDelete.Add(path);
                        }
                    }

                    // Here intermediate store's saving and deleting proccessed files
                    lock (s_intermediateSaveLockObj)
                    {
                        if (++processedSignatures > proccessedForSave * saveIndex)
                        {
                            LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");

                            mainStore.WriteIfChanged();
                            StartRemovingSignatures(toDelete);
                            saveIndex++;
                        }
                    }
                }, token));
            });

            try
            {
                Task.WaitAll(tasks.ToArray());
            }
            catch (AggregateException ae)
            {
                foreach (Exception e in ae.InnerExceptions)
                {
                    if (e is not TaskCanceledException)
                        LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts", e);
                }
            }
            mainStore.WriteIfChanged();
            StartRemovingSignatures(toDelete);
            ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
        }

        LogsHelper.WriteEventLog(ResultStr);
        return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
    }
    catch (Exception)
    {
        throw;
    }
    finally
    {
        IsWorking = false;
    }
}

方法

UploadSigningCerts

private static bool UploadSigningCerts(Store store, string path, ImportStats importStats)
{
    bool toBeDeleted = true;
    CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;

    try
    {
        List<CertInfo> certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToList();

        Interlocked.Add(ref importStats.all, certs.Count);

        for (int i = 0; i < certs.Count; i++)
        {
            lock (s_importLockObj)
            {
                // Validating each certificate from a file, making an import decision, importing to the store...
            }
        }
        return toBeDeleted;
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
        LogsHelper.WriteEventLog($"Error importing certificate from signature: {Path.GetFileName(path)};");
        Interlocked.Increment(ref importStats.errors);
        return false;
    }
}

方法

StartRemovingSignatures

private static void StartRemovingSignatures(List<string> toDelete)
{
    if (toDelete.Count > 0)
    {
        List<string> tempToDelete;
        lock (s_toDeleteListLockObj)
        {
            tempToDelete = new List<string>(toDelete);
            toDelete.Clear();
        }

        LogsHelper.WriteEventLog("Deleting successfully processed signature files...");

        Task.Factory.StartNew(() =>
        {
            tempToDelete.ForEach(path =>
            {
                try
                {
                    File.Delete(path);
                }
                catch (Exception e)
                {
                    LogsHelper.WriteLog("ImportResult/DeleteSignatures", e);
                }
            });
        });
    }
}

错误文本:

20.08.2023 11:58:01 api/Settings/ImportSignatures
Exception of type 'System.OutOfMemoryException' was thrown.
   at System.Threading.Tasks.Task.EnsureContingentPropertiesInitializedUnsafe()
   at System.Threading.Tasks.Task.AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation)
   at System.Threading.Tasks.Task.TaskConstructorCore(Delegate action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
   at Store.Services.SignatureImportService.<>c__DisplayClass20_0.<ImportSigningCerts>b__0(String path)
   at System.Collections.Generic.List`1.ForEach(Action`1 action)
   at Store.Services.SignatureImportService.ImportSigningCerts()
   at Store.Controllers.SettingsController.ImportSignatures()
c# asp.net-mvc task out-of-memory .net-6.0
1个回答
0
投票

我已经修复了错误

System.OutOfMemoryException
并且我想分享我的操作。要点:

  • 我重构了代码,方法
    ImportDeloWebSignatures
    ImportSigningCerts
    现在是异步的;
  • 我没有同时运行所有文件的任务,而是使用
    SemaphoreSlim
    ;
  • 将运行次数限制为逻辑处理器数量的两倍
  • 我放弃了将所有任务累积在一个数组中然后将其传递给
    Task.WaitAll
    方法的想法。该数组现在包含不超过 100 000 个任务,然后我等待它并
    Dispose
    每个任务。

现在我的代码如下所示: 来自控制器的方法

ImportSignatures

[HttpPost("ImportSignatures")]
public async Task<JsonResult> ImportSignatures()
{
    try
    {
        ImportSigningCertsResult res = await SignatureImportService.ImportSigningCerts();
        return Json(res);
    }
    catch (Exception e)
    {
        LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
        return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
    }
}

方法

ImportSigningCerts

public static async Task<ImportSigningCertsResult> ImportSigningCerts()
{
    LogsHelper.WriteEventLog("Launching SignatureImportService");

    WasCancelled = false;
    IsWorking = true;
    ResultStr = "";
    signaturesCount = 0;
    processedSignatures = 0;

    var cancelMsg = "Certificate import was interrupted. \n";
    var endMsg = "Certificate import completed successfully. \n";
    var toDelete = new List<string>();

    try
    {
        var configuration = SignatureImportConfiguration.FromCfg();

        using (s_tokenSource = new CancellationTokenSource())
        {
            string[] signatures = Directory.GetFiles(configuration.Path, "*.sig");
            signaturesCount = signatures.Length;

            Store mainStore = StoreMan.GetStore("Main");
            ImportStats importStats = new();

            const int partSize = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures
            CancellationToken token = s_tokenSource.Token;
            using SemaphoreSlim semaphore = new(Environment.ProcessorCount * 2);

            for (int i = 0; i < signaturesCount / partSize + 1; i++)
            {
                List<Task> tasks = new();

                // We divide the total array into parts containing partSize elements so as not to accumulate a large array of tasks
                foreach (string path in signatures.Skip(i * partSize).Take(partSize))
                {
                    if (WasCancelled)
                        break;
                    try
                    {
                        await semaphore.WaitAsync();
                        tasks.Add(Task.Factory.StartNew(() =>
                        {
                            try
                            {
                                token.ThrowIfCancellationRequested();

                                if (UploadSigningCerts(mainStore, path, importStats) && configuration.NeedCleaning)
                                {
                                    lock (s_toDeleteListLockObj)
                                        toDelete.Add(path);
                                }
                            }
                            catch (Exception e) when (e is not OperationCanceledException)
                            {
                                LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts:Task.Factory.StartNew", e);
                            }
                            finally
                            {
                                semaphore.Release();
                                Interlocked.Increment(ref processedSignatures);
                            }
                        }, token));
                    }
                    catch (Exception e)
                    {
                        LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts", e);
                        Interlocked.Increment(ref importStats.errors);
                    }
                }

                try
                {
                    await Task.WhenAll(tasks);
                }
                catch (OperationCanceledException) { }
                finally
                {
                    tasks.ForEach(t => t.Dispose());
                }

                LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");

                mainStore.WriteIfChanged();
                StartRemovingSignatures(toDelete);

                if (WasCancelled)
                    break;
            }
            ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
        }

        LogsHelper.WriteEventLog(ResultStr);
        return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
    }
    finally
    {
        IsWorking = false;
    }
}

其他方法保持不变。感谢所有写评论的人,你们引导我走上了正确的道路))

© www.soinside.com 2019 - 2024. All rights reserved.