C#并行添加队列并侦听队列传播

问题描述 投票:0回答:2

我是多线程编程的新手。我有一个程序需要查询数据库,然后对返回的数据执行一些数据操作。由于我的组织结构,我必须单独调用数据库来检索单个用户的帐户信息。我的任务涉及收集数千个帐户的数据。

目前,我使用Parallel.ForEach()来查询数据库并将所有元素添加到ConcurrentList中。一旦从数据库返回所有数据,我就以同步方式执行我的操作。

除了任何明显的问题之外,我不喜欢的一件事就是在内存中保留一个大的列表,并且在冗长的数据库进程完成之前基本上被阻塞。我希望能够将数据推送到队列中,然后在添加数据后立即开始处理数据。消费过程不需要是并行或异步的。我只需要能够侦听何时添加到队列或队列不为空。

并行流程:

public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(List<AccountInfo> accountList, string dbConnName)
    {
        logger.Info("Fetching Data");
        var concurrentCombinedData = new ConcurrentBag<CombinedAccountInfo>();
        Parallel.ForEach(accountList, new ParallelOptions { MaxDegreeOfParallelism = 5 }, r =>
        {
            try
            {
                var userPrefs = new List<UserPreference>().queryData(Queries.UserPrefQuery, dbConnName);

                concurrentCombinedData.Add(new CombinedAccountInfo()
                {
                    AccountName = r.AccountName,
                    AccountId = r.AccountId,
                    LastLoginDate = r.LastLoginDate,
                    AccountHandle = r.AccountHandle,
                    UserPreferences = userPrefs 
                });
            }
            catch (Exception e)
            {
                logger.Error(e);
            }
        });

        return concurrentCombinedTransaction;
    }

我已经对Dataflow做了一些阅读,并看了一些关于Reactive Extensions的文章。但是,我似乎可以找到任何更简单的多个生产者的例子,这些生产者都会投入到单个消如何更好地达到最终目标的任何建议或想法将不胜感激。

解决了

我将使用Scott Hannen提供的答案。因为操作很小而且不是非常密集,每个进程都可以处理它,而不是试图将所有内容都绑回到列表中。

c# .net queue task-parallel-library producer-consumer
2个回答
0
投票

如果你想从数据库中检索它,你想要对每个帐户进行操作,那么你可以做到这一点,而不是向ConcurrentBag<CombinedAccountInfo>添加元素。

public static ConcurrentBag<CombinedAccountInfo> GetAllAccountInfo(
    List<AccountInfo> accountList, 
    string dbConnName,
    Action<CombinedAccountInfo> doSomethingWithTheAccountInfo)

然后,当您从数据库中获取每个元素时,

doSomethingWithTheAccountInfo(accountInfo);

0
投票

虽然我真的相信您应该立即查询所有用户首选项,因为这样可以提高数据库的性能(实际上是BIG TIME),如果你想要这样的话:

public void Answer<T>(List<Guid> ids)
{
    var stack = new ConcurrentStack<T>();

    Parallel.ForEach(ids, (id) =>
    {
        T value = GetData<T>(id);

        stack.Push(value);
    });

    Parallel.For(0, ids.Count, (i) =>
    {
        T item;
        while (!stack.TryPop(out item))
        {
            // sleep
        }
        Process(item);
    });
}

但是我已经提过了,我想你不应该去那里?

© www.soinside.com 2019 - 2024. All rights reserved.