我正在尝试在 mpi RMA 中实现无锁分布式链表。
这里是插入函数的代码,在给定键的节点后插入一个新节点。
void insertAfter(int id, int newVal, int key, int rank, nodePtr head, MPI_Win win)
{
nodePtr newNode, curNodePtr = head, next, tmp2 = { 0 }, fetched = { 0 }, tmp3 = { 0 };
node tmp, curNode = { 0 }, test;
int done = 0;
newNode.rank = rank;
newNode.disp = allocElem(id, newVal, win);
while (done == 0 && curNodePtr.rank != -1 && curNodePtr.disp != (MPI_Aint)MPI_BOTTOM) {
MPI_Get_accumulate(NULL, 0, MPI_BYTE, (void*)&curNode, sizeof(node),
MPI_BYTE, curNodePtr.rank, curNodePtr.disp, sizeof(node), MPI_BYTE, MPI_NO_OP, win);
MPI_Win_flush(curNodePtr.rank, win);
if (curNode.id == key) {
next = curNode.next;
do{
MPI_Compare_and_swap((void*)&newNode.rank, (void*)&next.rank, (void*)&tmp2.rank,
MPI_INT, curNodePtr.rank, (MPI_Aint) & (((node*)curNodePtr.disp)->next.rank), win);
MPI_Win_flush(curNodePtr.rank, win);
if (tmp2.rank == next.rank) {
MPI_Accumulate((void*) & next.rank, 1, MPI_INT, newNode.rank,
(MPI_Aint) & (((node*)newNode.disp)->next.rank), 1, MPI_INT,
MPI_REPLACE, win);
MPI_Accumulate((void*) & next.disp, 1, MPI_AINT, newNode.rank,
(MPI_Aint) & (((node*)newNode.disp)->next.disp), 1, MPI_AINT,
MPI_REPLACE, win);
MPI_Win_flush(newNode.rank, win);
MPI_Accumulate(&newNode.disp, 1, MPI_AINT, curNodePtr.rank,
(MPI_Aint) & (((node*)curNodePtr.disp)->next.disp), 1, MPI_AINT,
MPI_REPLACE, win);
MPI_Win_flush(curNodePtr.rank, win);
done = 1;
}
else {
do {
MPI_Fetch_and_op((void*)&fetched.disp, (void*)&tmp2.disp, MPI_AINT, curNodePtr.rank,
(MPI_Aint) & (((node*)curNodePtr.disp)->next.disp), MPI_NO_OP, win);
MPI_Win_flush(curNodePtr.rank, win);
} while (tmp2.disp == next.disp);
}
next = tmp2;
} while (done != 1);
}
curNodePtr = curNode.next;
}
}
这个函数在列表中找到正确的位置并尝试添加一个新节点。当我使用 mpiexec -n 1 启动程序时,一切正常,但是当我使用 -n > 2 启动它时,它不起作用。它说“请求的位移指定了 RMA 窗口之外的内存”。当我使用 -n = 2 启动它时它工作正常,但是当我使用此功能打印列表时:
void printList(int procid, nodePtr head, MPI_Win win)
{
nodePtr curNodePtr = head;
printf("Rank[%d]: Result list is: \n", procid);
while (curNodePtr.disp != nullPtr.disp) {
node curNode = { 0 };
MPI_Win_lock(MPI_LOCK_SHARED, curNodePtr.rank, 0, win);
MPI_Get(&curNode, sizeof(node), MPI_BYTE,
curNodePtr.rank, curNodePtr.disp, sizeof(node), MPI_BYTE, win);
printf("id %d: val %d was inserted by rank %d at displacement %x marked %d nextR %d nextD %x"
"\n", curNode.id, curNode.val, curNodePtr.rank, curNodePtr.disp, curNode.marked,
curNode.next.rank, curNode.next.disp);
MPI_Win_unlock(curNodePtr.rank, win);
curNodePtr = curNode.next;
}
}
它表明 nextR = nextD = 0 在由等级为 1 的进程添加的节点上(由等级 0 插入的节点具有有效的 nextR 和 nextD 值)
这里是 allocElem 的代码
MPI_Aint allocElem(int id, int val, MPI_Win win) {
MPI_Aint disp;
node* allocNode;
MPI_Alloc_mem(sizeof(node), MPI_INFO_NULL, &allocNode);
allocNode->id = id;
allocNode->val = val;
allocNode->next = nullPtr;
allocNode->marked = 0;
MPI_Win_attach(win, allocNode, sizeof(node));
if (allocNodeCount == allocNodeSize) {
allocNodeSize += 100;
allocNodesTmp = (node**)realloc(allocNodes, allocNodeSize * sizeof(node*));
if (allocNodesTmp != NULL)
allocNodes = allocNodesTmp;
else {
printf("Error while allocating memory!\n");
return -1;
}
}
allocNodes[allocNodeCount] = allocNode;
allocNodeCount++;
MPI_Get_address(allocNode, &disp);
return disp;
}
这是主要功能:
MPI_Win_lock_all(0, win);
for (int i = 0; i < 10; i++) {
insertAfter(2,3, -1, procid, head, win);
}
MPI_Win_unlock_all(win);
MPI_Barrier(MPI_COMM_WORLD);
if (procid == 0) printList(procid, head, win);
MPI_Barrier(MPI_COMM_WORLD);
MPI_Win_free(&win);
for (int i = 0; i < allocNodeCount; i++) MPI_Free_mem(allocNodes[i]);
MPI_Finalize();