.NET 中的嵌套异步任务

问题描述 投票:0回答:1

我正在编写一个脚本,用于扩展我的电子邮件数据集之一,其中包含一些我必须通过调用外部 API 来计算的功能。我必须使用 C#,这不是我很有信心的语言。

具体来说,我有 N 封电子邮件(

emails
数组),每封电子邮件都有一个
mailID
和包含在
MailData
对象中的属性。对于每封电子邮件,我需要调用 3 个异步方法(
ComputeAttachmentsFeatures
ComputeVirusTotalFeatures
ComputeHeaderFeatures
)来调用外部 API 并将一些检索到的数据添加到邮件对象。

此外,每个

mail
对象在
MailURLs
属性中都有 0 个或多个 URL。对于每个 URL,我都必须调用另外 3 个异步方法(
ComputeDNSFeatures
ComputeWhoIsFeatures
ComputePageRankFeatures
)。

我期望在每次迭代中,我都会并行运行前 3 个任务(

ComputeAttachmentsFeatures
ComputeVirusTotalFeatures
ComputeHeaderFeatures
)以及每个 URL 的 3 个任务(
ComputeDNSFeatures
ComputeWhoIsFeatures
ComputePageRankFeatures
)。对于我如何编写以下代码,我期望最多有 6 个任务并行运行。

下面的代码无法正常工作,因为并非所有任务都得到正确等待,并且迭代在完成之前继续进行,传递到下一个邮件对象。我尝试使用

Task.WaitAll(Task[])
方法。我做错了什么?

foreach ((long mailID, MailData mail) in emails) 
                {
                    Logger.Info("Mail " + mailID);
                    Task [] mailTasks =
                    {
                        Task.Run(async () => await mail.ComputeAttachmentsFeatures(client_Attachments, mailID)),
                        Task.Run(async () => await mail.ComputeVirusTotalFeatures(client_VT, mailID)),
                        Task.Run(async () => await mail.ComputeHeaderFeatures(client_Headers, mailID))
                    };
                    //For each URL in email, call the APIs
                    foreach (var url in mail.MailURLs)
                    {
                        if (IsValidURL(url.FullHostName))
                        {
                            Task [] urlTasks =
                            {
                                Task.Run(async () => await url.ComputeDNSFeatures(mailID)), // Task.Factory.StartNew
                                Task.Run(async () => await url.ComputeWhoIsFeatures(mailID)),
                                Task.Run(async () => await url.ComputePageRankFeatures(client_PageRank, mailID))
                            };
                            Task.WaitAll(urlTasks);
                        }
                    }
                    Task.WaitAll(mailTasks);
                    Logger.Info("Mail completed " + mailID);
}

作为示例,我可以编写一个被调用的异步函数:

public async Task<int> ComputeHeaderFeatures(HttpClient client, long mailId)
        {

            //  Blacklists check of the traversed mailservers  -n_smtp_servers_blacklist-
            n_smtp_servers_blacklist = 0;
            foreach (string mail_server in ServersInReceivedHeaders)
            {
                if (!string.IsNullOrEmpty(mail_server) && Program.IsValidURL(mail_server))  // Only analyze it if it's a valid URL or IP
                {
                    // API call to check the mail_server against more than 100 blacklists
                    BlacklistURL alreadyAnalyzedURL = (BlacklistURL)Program.BlacklistedURLs.Find(mail_server);    // Checks if the IP has already been analyzed
                    if (alreadyAnalyzedURL == null)
                    {
                        BlacklistURL blacklistsResult = new BlacklistURL(mail_server);
                        while (true)
                        {
                            try
                            {
                                int statusCode = await BlacklistURL_API.PerformAPICall(blacklistsResult, client);

                                if (statusCode == 429) // Rate Limit Hit
                                {
                                    Program.Logger.Debug($"BlackListChecker - Rate limit hit for mail {mailId} - {mail_server}, will retry after {CapDelayBlackListChecking} s");
                                    Thread.Sleep(CapDelayBlackListChecking * 1000);
                                    continue;
                                }
                                if (statusCode == 503)  // Service temporarily unavailable
                                {
                                    Program.Logger.Debug($"BlackListChecker - Service temporarily Unavailable (503 error), will retry after {DefaultDelay503} s");
                                    Thread.Sleep(DefaultDelay503 * 1000);
                                    continue;  // try again later
                                }
                                Program.BlacklistedURLs.Add(blacklistsResult);  // Adds the server and its result to the list of already analyzed servers
                                if (blacklistsResult.GetFeature() > 0) { n_smtp_servers_blacklist++; }  // If the server appears in at least 1 blacklist, we count it as malicious

                                Program.Logger.Debug($"BlackListChecker call for mail {mailId} - {mail_server} responded with status code {statusCode}");
                            }
                            catch (Exception ex)
                            {
                                Program.Logger.Error($"BlackListChecker - An exception occurred for mail {mailId} - {mail_server}:\n{ex}");
                                throw;
                            }
                            finally
                            {
                                Program.Logger.Debug($"BlackListChecker - Wait {DefaultGapBetweenCallsBlackListChecking} s before next call...");
                                Thread.Sleep(DefaultGapBetweenCallsBlackListChecking * 1000);
                            }
                            // Break the inner loop to proceed with the next argument
                            break;
                        }
                    }
                    else  // The mailserver has already been analyzed, so we take the available result
                    {
                        if (alreadyAnalyzedURL.NBlacklists > 0) { n_smtp_servers_blacklist++; }
                    }
                }
            }
return 1;
}
c# .net concurrency task parallel.foreach
1个回答
0
投票

试试这个

foreach ((long mailID, MailData mail) in emails)
{
    Logger.Info("Mail " + mailID);
    Task[] mailTasks =
    {
                mail.ComputeAttachmentsFeatures(client_Attachments, mailID),
                mail.ComputeVirusTotalFeatures(client_VT, mailID),
                mail.ComputeHeaderFeatures(client_Headers, mailID)
            };
    //For each URL in email, call the APIs
    foreach (var url in mail.MailURLs)
    {
        if (IsValidURL(url.FullHostName))
        {
            Task[] urlTasks =
            {
                        url.ComputeDNSFeatures(mailID), // Task.Factory.StartNew
                        url.ComputeWhoIsFeatures(mailID),
                        url.ComputePageRankFeatures(client_PageRank, mailID)
                    };
            Task.WaitAll(urlTasks);
        }
    }
    Task.WaitAll(mailTasks);
    Logger.Info("Mail completed " + mailID);
}
© www.soinside.com 2019 - 2024. All rights reserved.