因此,对于uni,我们必须完成这项作业,我们必须并行地串行实现高峰时段求解器。求解器使用广度优先搜索 (BFS) 实现。
这是默认 BFS 实现的一部分:
// Initialize empty queue
Queue<Tuple<byte[], Solution>> q = new Queue<Tuple<byte[], Solution>>();
// By default, the solution is "no solution"
foundSolution = new NoSolution();
// Place the starting position in the queue
q.Enqueue(Tuple.Create(vehicleStartPos, (Solution)new EmptySolution()));
AddNode(vehicleStartPos);
// Do BFS
while (q.Count > 0)
{
Tuple<byte[], Solution> currentState = q.Dequeue();
// Generate sucessors, and push them on to the queue if they haven't been seen before
foreach (Tuple<byte[], Solution> next in Sucessors(currentState))
{
// Did we reach the goal?
if (next.Item1[targetVehicle] == goal)
{
q.Clear();
foundSolution = next.Item2;
break;
}
// If we haven't seen this node before, add it to the Trie and Queue to be expanded
if(!AddNode(next.Item1))
q.Enqueue(next);
}
}
Console.WriteLine(foundSolution);
Console.ReadLine();
我设法将其变成这样的并行:
ConcurrentQueue<Tuple<byte[], Solution>> q = new ConcurrentQueue<Tuple<byte[], Solution>>();
foundSolution = new NoSolution();
q.Enqueue(Tuple.Create(vehicleStartPos, (Solution)new EmptySolution()));
AddNode(vehicleStartPos);
while (q.Count > 0 && !solutionFound)
{
Tuple<byte[], Solution> currentState;
q.TryDequeue(out currentState);
Parallel.ForEach(Sucessors(currentState), (next) =>
{
// Did we reach the goal?
if (next.Item1[targetVehicle] == goal)
{
solutionFound = true;
foundSolution = next.Item2;
return;
}
// If we haven't seen this node before, add it to the Trie and Queue to be expanded
if (!AddNode(next.Item1))
q.Enqueue(next);
});
}
如您所见,我尝试使用
ConcurrentQueue
实现并行 foreach 循环。我感觉并发队列工作得很好,但是它会自动锁定,因此花费了太多时间,使得这种并行实现方式比串行实现方式慢。
我正在考虑有一个无等待或至少无锁队列,这样我可以节省一点时间,但我不知道如何实现这样的事情。你们能否深入了解这是否可行以及是否比使用常规队列更快?或者也许使用不同的并发数据结构来更好地适应情况。不确定 ConcurrentBag 等是否适合。您能解释一下吗?
此外,在搜索并行 BFS 实现后,我找不到任何实现。对于像我这样想要并行实现 BFS 的人来说,有哪些一般技巧和提示?队列有哪些好的替代方案可以使其线程安全?
编辑1: 我成功地执行了这样的任务:
int taskNumbers = Environment.ProcessorCount;
Task[] tasks = new Task[taskNumbers];
// Set up the cancellation token
ctSource = new CancellationTokenSource();
for (int i = 0; i < taskNumbers; i++)
tasks[i] = new Task(() =>
{
try{ Traverse(); }
catch{ }
},
ctSource.Token);
for (int i = 0; i < taskNumbers; i++)
tasks[i].Start();
Task.WaitAll(tasks);
ctSource.Dispose();
他们调用了一个遍历方法,如下所示:
private static void Traverse()
{
ctSource.Token.ThrowIfCancellationRequested();
while (q.Count > 0)
{
Tuple<byte[], Solution> currentState;
if (q.TryDequeue(out currentState))
{
foreach (Tuple<byte[], Solution> next in Sucessors(currentState))
{
// Did we reach the goal?
if (next.Item1[targetVehicle] == goal)
{
ctSource.Cancel();
foundSolution = next.Item2;
return;
}
// If we haven't seen this node before, add it to the Trie and Queue to be expanded
if (!AddNode(next.Item1))
q.Enqueue(next);
}
}
if (ctSource.IsCancellationRequested)
ctSource.Token.ThrowIfCancellationRequested();
}
}
但是,我无法弄清楚遍历方法中 while 循环的条件。当前的情况允许任务过早退出循环。据我所知,我没有所有可用节点的完整列表,因此我无法将访问的树与所有节点的列表进行比较。除此之外,我没有任何其他想法如何让任务在 while 循环中循环,直到找到答案或直到没有更多新节点。你们能帮帮我吗?
感谢@Brian Malehorn 迄今为止的帮助,我设法使并行 BFS 版本的性能几乎等于串行版本的性能。我认为我现在需要的就是让任务停留在 while 循环中。
问题不在于你的队列,问题在于你并行化了错误的东西。当您应该并行化 Sucessors()
调用时,您正在并行化
添加后继者到队列中。
也就是说,
Sucessors()
只能从工作线程中调用,而不能在“主”线程中调用。
例如,假设
Sucessors()
需要 1 秒才能运行,而您正在搜索这棵树:
o
/ \
/ \
o o
/ \ / \
o o o o
搜索这棵树的最快时间是 3 秒。您的代码需要多长时间?