我有 600 万个小文件(平均大小约为 15 字节),需要使用处理器读取并进一步处理。我之前已经使用 Task.Factory 实现了此功能,并且它在 asp.net core 2.1 上运行没有问题。花了大约20个小时。
我现在已将应用程序迁移到 asp.net 6,并且在测试服务器上,我的 Web 应用程序在启动这些文件操作后停止响应任何请求。在日志中我看到错误
System.OutOfMemoryException
。
我想我的实现方式还很不理想。我想知道您可以建议哪些多线程实现这项工作的方法?
来自控制器的方法
ImportSignatures
:
[HttpPost("ImportSignatures")]
public JsonResult ImportSignatures()
{
try
{
return Json(SignatureImportService.ImportSigningCerts());
}
catch (Exception e)
{
LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
}
}
方法
ImportSigningCerts
:
public static ImportSigningCertsResult ImportSigningCerts()
{
LogsHelper.WriteEventLog("Launching SignatureImportService");
WasCancelled = false;
IsWorking = true;
ResultStr = "";
totalSignatures = 0;
processedSignatures = 0;
var cancelMsg = "Certificate import was interrupted. \n";
var endMsg = "Certificate import completed successfully. \n";
var toDelete = new List<string>();
try
{
var configuration = SignatureImportConfiguration.FromCfg();
using (s_tokenSource = new CancellationTokenSource())
{
List<string> signatures = Directory.EnumerateFiles(configuration.Path, "*.sig").ToList();
totalSignatures = signatures.Count;
Store mainStore = StoreMan.GetStore("Main");
var importStats = new ImportStats();
var tasks = new List<Task>();
int saveIndex = 1;
const int proccessedForSave = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures
CancellationToken token = s_tokenSource.Token;
int minWorkerThreads, minCompletionPortThreads, maxWorkerThreads, maxCompletionPortThreads;
ThreadPool.GetMinThreads(out minWorkerThreads, out minCompletionPortThreads);
ThreadPool.GetMaxThreads(out maxWorkerThreads, out maxCompletionPortThreads);
ThreadPool.SetMaxThreads(minWorkerThreads * 2, maxCompletionPortThreads);
signatures.ForEach(path =>
{
tasks.Add(Task.Factory.StartNew(() =>
{
token.ThrowIfCancellationRequested();
// Here reading of a current file and uploading the necessary certificates to the store from it
if (UploadSigningCerts(mainStore, path, importStats))
{
if (configuration.NeedCleaning)
{
lock (s_toDeleteListLockObj)
toDelete.Add(path);
}
}
// Here intermediate store's saving and deleting proccessed files
lock (s_intermediateSaveLockObj)
{
if (++processedSignatures > proccessedForSave * saveIndex)
{
LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");
mainStore.WriteIfChanged();
StartRemovingSignatures(toDelete);
saveIndex++;
}
}
}, token));
});
try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ae)
{
foreach (Exception e in ae.InnerExceptions)
{
if (e is not TaskCanceledException)
LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts", e);
}
}
mainStore.WriteIfChanged();
StartRemovingSignatures(toDelete);
ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
}
LogsHelper.WriteEventLog(ResultStr);
return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
}
catch (Exception)
{
throw;
}
finally
{
IsWorking = false;
}
}
方法
UploadSigningCerts
:
private static bool UploadSigningCerts(Store store, string path, ImportStats importStats)
{
bool toBeDeleted = true;
CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;
try
{
List<CertInfo> certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToList();
Interlocked.Add(ref importStats.all, certs.Count);
for (int i = 0; i < certs.Count; i++)
{
lock (s_importLockObj)
{
// Validating each certificate from a file, making an import decision, importing to the store...
}
}
return toBeDeleted;
}
catch (Exception e)
{
LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
LogsHelper.WriteEventLog($"Error importing certificate from signature: {Path.GetFileName(path)};");
Interlocked.Increment(ref importStats.errors);
return false;
}
}
方法
StartRemovingSignatures
:
private static void StartRemovingSignatures(List<string> toDelete)
{
if (toDelete.Count > 0)
{
List<string> tempToDelete;
lock (s_toDeleteListLockObj)
{
tempToDelete = new List<string>(toDelete);
toDelete.Clear();
}
LogsHelper.WriteEventLog("Deleting successfully processed signature files...");
Task.Factory.StartNew(() =>
{
tempToDelete.ForEach(path =>
{
try
{
File.Delete(path);
}
catch (Exception e)
{
LogsHelper.WriteLog("ImportResult/DeleteSignatures", e);
}
});
});
}
}
错误文本:
20.08.2023 11:58:01 api/Settings/ImportSignatures
Exception of type 'System.OutOfMemoryException' was thrown.
at System.Threading.Tasks.Task.EnsureContingentPropertiesInitializedUnsafe()
at System.Threading.Tasks.Task.AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation)
at System.Threading.Tasks.Task.TaskConstructorCore(Delegate action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
at Store.Services.SignatureImportService.<>c__DisplayClass20_0.<ImportSigningCerts>b__0(String path)
at System.Collections.Generic.List`1.ForEach(Action`1 action)
at Store.Services.SignatureImportService.ImportSigningCerts()
at Store.Controllers.SettingsController.ImportSignatures()
我已经修复了错误
System.OutOfMemoryException
并且我想分享我的操作。要点:
ImportDeloWebSignatures
和 ImportSigningCerts
现在是异步的;SemaphoreSlim
;Task.WaitAll
方法的想法。该数组现在包含不超过 100 000 个任务,然后我等待它并Dispose
每个任务。现在我的代码如下所示: 来自控制器的方法
ImportSignatures
:
[HttpPost("ImportSignatures")]
public async Task<JsonResult> ImportSignatures()
{
try
{
ImportSigningCertsResult res = await SignatureImportService.ImportSigningCerts();
return Json(res);
}
catch (Exception e)
{
LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
}
}
方法
ImportSigningCerts
:
public static async Task<ImportSigningCertsResult> ImportSigningCerts()
{
LogsHelper.WriteEventLog("Launching SignatureImportService");
WasCancelled = false;
IsWorking = true;
ResultStr = "";
signaturesCount = 0;
processedSignatures = 0;
var cancelMsg = "Certificate import was interrupted. \n";
var endMsg = "Certificate import completed successfully. \n";
var toDelete = new List<string>();
try
{
var configuration = SignatureImportConfiguration.FromCfg();
using (s_tokenSource = new CancellationTokenSource())
{
string[] signatures = Directory.GetFiles(configuration.Path, "*.sig");
signaturesCount = signatures.Length;
Store mainStore = StoreMan.GetStore("Main");
ImportStats importStats = new();
const int partSize = 100000; // After what number of processed signatures to perform intermediate storage and deletion of signatures
CancellationToken token = s_tokenSource.Token;
using SemaphoreSlim semaphore = new(Environment.ProcessorCount * 2);
for (int i = 0; i < signaturesCount / partSize + 1; i++)
{
List<Task> tasks = new();
// We divide the total array into parts containing partSize elements so as not to accumulate a large array of tasks
foreach (string path in signatures.Skip(i * partSize).Take(partSize))
{
if (WasCancelled)
break;
try
{
await semaphore.WaitAsync();
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
token.ThrowIfCancellationRequested();
if (UploadSigningCerts(mainStore, path, importStats) && configuration.NeedCleaning)
{
lock (s_toDeleteListLockObj)
toDelete.Add(path);
}
}
catch (Exception e) when (e is not OperationCanceledException)
{
LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts:Task.Factory.StartNew", e);
}
finally
{
semaphore.Release();
Interlocked.Increment(ref processedSignatures);
}
}, token));
}
catch (Exception e)
{
LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts", e);
Interlocked.Increment(ref importStats.errors);
}
}
try
{
await Task.WhenAll(tasks);
}
catch (OperationCanceledException) { }
finally
{
tasks.ForEach(t => t.Dispose());
}
LogsHelper.WriteEventLog("Intermediate saving of the certificate store...");
mainStore.WriteIfChanged();
StartRemovingSignatures(toDelete);
if (WasCancelled)
break;
}
ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
}
LogsHelper.WriteEventLog(ResultStr);
return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
}
finally
{
IsWorking = false;
}
}
其他方法保持不变。感谢所有写评论的人,你们引导我走上了正确的道路))