我正在尝试使用 MPI 实现图像中描述的算法。这是大学项目的一部分,我们正在该项目中构建分布式卫星到地面站通信系统。我已经对其进行了多次迭代,但当选出多个候选获胜者时,我永远无法打破平局。
这是处理选举过程的函数。每个地面站进程都会调用此函数(其通信组中有 5 个,GS 0、1、2、3 和 4),并通过来自协调器进程(排名 10)的
START_LELECT_GS
事件启动。
void perform_gs_leader_election(int coordinator_rank, int rank, MPI_Comm comm) {
MPI_Status status;
int is_leader = 0;
int received_count = 0;
int neighbor_received_from[num_of_neighbors];
printf("Process %d: Starting leader election\n", rank);
// Determine if the process is a leaf node
const int is_leaf = (num_of_neighbors == 1);
printf("Process %d: Is leaf? %d\n", rank, is_leaf);
// Send initial <ELECT> messages only if the process is a leaf node
if (is_leaf) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[0], ELECT, comm);
printf("Leaf Process %d: Sent ELECT to %d\n", rank, neighbor_gs[0]);
}
// Receive and process messages from neighbors
while (1) {
int sender_rank;
MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
printf("Process %d: Received message from %d with tag %d\n", rank, status.MPI_SOURCE, status.MPI_TAG);
if (status.MPI_TAG == ELECT) {
received_count++;
neighbor_received_from[status.MPI_SOURCE] = 1;
printf("Process %d: Received ELECT from %d\n", rank, status.MPI_SOURCE);
// If all neighbors except one have sent ELECT messages, probe for incoming messages before sending ELECT to the remaining neighbor
if (received_count == num_of_neighbors - 1) {
int flag;
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, &status);
if (flag && status.MPI_TAG == TERMINATE_LELECT_GS) {
printf("Process %d: Received TERMINATE_LELECT_GS from %d\n", rank, status.MPI_SOURCE);
if (status.MPI_SOURCE > rank) {
printf("Process %d: Surrendering leadership to process %d\n", rank, status.MPI_SOURCE);
MPI_Send(&rank, 1, MPI_INT, status.MPI_SOURCE, GS_LEADER, comm);
is_leader = 0;
} else {
printf("Process %d: Declaring leadership over process %d\n", rank, status.MPI_SOURCE);
is_leader = 1;
}
// send out terminate to all neighbors but the sender
for (int i = 0; i < num_of_neighbors; i++) {
if (neighbor_gs[i] != status.MPI_SOURCE) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
printf("Process %d: Forwarded TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
}
}
break;
}
if (!flag) {
// if we got elect from all but one, send elect to remaining neighbor
for (int i = 0; i < num_of_neighbors; i++) {
if (!neighbor_received_from[neighbor_gs[i]]) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], ELECT, comm);
printf("Process %d: Sent ELECT to remaining neighbor %d\n", rank, neighbor_gs[i]);
break;
}
}
}
}
// If all neighbors have sent ELECT messages, this process is a candidate for the leader
if (received_count == num_of_neighbors) {
printf("Process %d: Candidate for leader\n", rank);
is_leader = 1;
// send terminate to all neighbors
for (int i = 0; i < num_of_neighbors; i++) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
printf("Process %d: Sent TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
}
break;
}
} else if (status.MPI_TAG == TERMINATE_LELECT_GS) {
printf("Process %d: Received TERMINATE_LELECT_GS from %d\n", rank, status.MPI_SOURCE);
for (int i = 0; i < num_of_neighbors; i++) {
if (neighbor_gs[i] != status.MPI_SOURCE) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
printf("Process %d: Forwarded TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
}
}
break;
} else if (status.MPI_TAG == GS_LEADER) {
printf("Process %d: Declared leader due to GS_LEADER message\n", rank);
is_leader = 1;
for (int i = 0; i < num_of_neighbors; i++) {
if (neighbor_gs[i] != status.MPI_SOURCE) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
printf("Process %d: Sent TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
}
}
break;
}
}
MPI_Barrier(comm);
printf("Process %d: Terminated\n", rank);
if (is_leader) {
printf("Process %d: Sending LELECT_GS_DONE to coordinator\n", rank);
MPI_Send(&rank, 1, MPI_INT, coordinator_rank, LELECT_GS_DONE, MPI_COMM_WORLD);
}
}
这是有问题的输出的示例:
parsing START_LELECT_GS event
Process 0: Starting leader election
Process 0: Is leaf? 0
Process 1: Starting leader election
Process 1: Is leaf? 0
Process 4: Starting leader election
Process 4: Is leaf? 1
Leaf Process 4: Sent ELECT to 0
Process 2: Starting leader election
Process 3: Starting leader election
Process 3: Is leaf? 1
Leaf Process 3: Sent ELECT to 1
Process 2: Is leaf? 1
Leaf Process 2: Sent ELECT to 1
Process 0: Received message from 4 with tag 22
Process 0: Received ELECT from 4
Process 0: Sent ELECT to remaining neighbor 1
Process 1: Received message from 3 with tag 22
Process 1: Received ELECT from 3
Process 1: Received message from 2 with tag 22
Process 1: Received ELECT from 2
Process 1: Sent ELECT to remaining neighbor 0
Process 1: Received message from 0 with tag 22
Process 1: Received ELECT from 0
Process 1: Candidate for leader
Process 1: Sent TERMINATE_LELECT_GS to 3
Process 1: Sent TERMINATE_LELECT_GS to 0
Process 1: Sent TERMINATE_LELECT_GS to 2
Process 0: Received message from 1 with tag 22
Process 0: Received ELECT from 1
Process 0: Candidate for leader
Process 0: Sent TERMINATE_LELECT_GS to 1
Process 0: Sent TERMINATE_LELECT_GS to 4
Process 4: Received message from 0 with tag 24
Process 4: Received TERMINATE_LELECT_GS from 0
Process 3: Received message from 1 with tag 24
Process 3: Received TERMINATE_LELECT_GS from 1
Process 2: Received message from 1 with tag 24
Process 2: Received TERMINATE_LELECT_GS from 1
Process 0: Terminated
Process 0: Sending LELECT_GS_DONE to coordinator
Process 3: Terminated
Process 4: Terminated
Process 1: Terminated
Process 1: Sending LELECT_GS_DONE to coordinator
process 10 waiting at barrier
Process 2: Terminated
出现了两个候选者,但尽管我试图探测中间终止消息以便我可以尽早打破平局,但它不起作用,但我无法弄清楚原因。
我确实设法实现了上述算法,现在似乎可以正常工作。如果有人感兴趣的话,这是代码。
附注如果您发现任何错误,请告诉我!
void perform_gs_leader_election(int coordinator_rank, int rank, MPI_Comm comm) {
MPI_Status status;
int leader_rank = -1;
int received_count = 0;
int neighbor_received_from[num_of_neighbors];
for (int i = 0; i < num_of_neighbors; i++) neighbor_received_from[i] = 0;
int remaining_neighbor = -1;
printf("Process %d: Starting leader election\n", rank);
// Determine if the process is a leaf node
const int is_leaf = (num_of_neighbors == 1);
printf("Process %d: Is leaf? %d\n", rank, is_leaf);
// Send initial <ELECT> messages only if the process is a leaf node
if (is_leaf) {
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[0], ELECT, comm);
printf("Leaf Process %d: Sent ELECT to %d\n", rank, neighbor_gs[0]);
}
do {
int sender_rank;
while (received_count < num_of_neighbors - 1) {
MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
if (status.MPI_TAG == ELECT) {
for (int i = 0; i < num_of_neighbors; i++) {
if (neighbor_gs[i] == status.MPI_SOURCE) {
neighbor_received_from[i] = 1;
}
}
received_count++;
printf("Process %d got ELECT from %d (replies = %d)\n", rank, status.MPI_SOURCE, received_count);
if (received_count == num_of_neighbors - 1) break;
}
if (status.MPI_TAG == TERMINATE_LELECT_GS) {
// leader has been found, terminate and notify all neighbors by sending leader rank
leader_rank = sender_rank;
printf("Process %d got TERMINATE from %d with leader %d\n", rank, status.MPI_SOURCE, sender_rank);
for (int i = 0; i < num_of_neighbors; i++) {
printf("Process %d propagating leader to process %d\n", rank, neighbor_gs[i]);
if(neighbor_gs[i] != status.MPI_SOURCE) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
}
break;
}
}
if (leader_rank != -1) break; // got terminate while still sending elects, terminate early
printf("iterating for remaining neighbor\n");
// we have received replies from all neighbors but one
for (int i = 0; i < num_of_neighbors; i++) {
printf("considering %d with found %d\n", neighbor_gs[i], neighbor_received_from[i]);
if (!neighbor_received_from[i]) {
printf("remaining neighbor is %d\n", neighbor_gs[i]);
remaining_neighbor = neighbor_gs[i];
break;
}
}
if (remaining_neighbor != -1 && !is_leaf) {
// if we are a leaf, don't resend
printf("Process %d has received all replies but one (from %d)...\n", rank, remaining_neighbor);
int flag;
// probe to see if the remaining neighbor is sending an ELECT before we send it one
MPI_Iprobe(remaining_neighbor, ELECT, comm, &flag, &status);
if (flag) {
MPI_Recv(&sender_rank, 1, MPI_INT, remaining_neighbor, ELECT, comm, &status);
// we got our last elect response, so we are the leader
printf("Process %d is leader because it received ELECT from remaining neighbor\n", rank);
// note: could update count and received neighbors but no need
leader_rank = rank;
// send out our rank to all neighbors
for (int j = 0; j < num_of_neighbors; j++) {
printf("Process %d propagating winner (us) to %d\n", rank, neighbor_gs[j]);
MPI_Send(&rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
} else {
// else we send to the last neighbor
printf("Process %d, no incoming last elect from probe, sending elect to last neighbor\n", rank);
MPI_Send(&rank, 1, MPI_INT, remaining_neighbor, ELECT, comm);
}
}
// have sent to all neighbors, wait for outcome
printf("Process %d has sent all elects, now waiting for final event\n", rank);
MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
if (status.MPI_TAG == ELECT && status.MPI_SOURCE == remaining_neighbor) {
// probe for terminate in case someone else was elected after the elect we received was sent
int flag = 0;
MPI_Iprobe(MPI_ANY_SOURCE, TERMINATE_LELECT_GS, comm, &flag, &status);
if (flag) {
// another process was declared leader faster, abort
MPI_Recv(&leader_rank, 1, MPI_INT, MPI_ANY_SOURCE, TERMINATE_LELECT_GS, comm, &status);
printf("Process %d got terminate from %d because another process won first!\n", rank,
status.MPI_SOURCE);
for (int j = 0; j < num_of_neighbors; j++) {
printf("Process %d sending terminate to %d\n", rank, neighbor_gs[j]);
if(neighbor_gs[j] != status.MPI_SOURCE) MPI_Send(&sender_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
}
// means we got our last elect but after we sent out an elect to the remaining neighbor
// incoming elect right after we sent elect the same way, so contest
if (rank > sender_rank) {
printf("Process %d got elect on same edge from %d and won\n", rank, sender_rank);
// we win the fight and are the leader
leader_rank = rank;
for (int j = 0; j < num_of_neighbors; j++) {
printf("Process %d propagating ourself (winner) to %d\n", rank, neighbor_gs[j]);
if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
} else {
// sender has won and is leader
printf("Process %d got elect on same edge and lost fight. winner was %d\n", rank,
status.MPI_SOURCE);
leader_rank = status.MPI_SOURCE;
for (int j = 0; j < num_of_neighbors; j++) {
printf("Process %d propagating other process/winner to %d\n", rank, neighbor_gs[j]);
if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
}
} else if (status.MPI_TAG == TERMINATE_LELECT_GS) {
// we got terminate signal, notify neighbors
leader_rank = sender_rank;
printf("Process %d got terminate from %d with leader being %d!\n", rank, status.MPI_SOURCE, leader_rank);
for (int j = 0; j < num_of_neighbors; j++) {
if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
}
break;
}
} while (1);
printf("Process %d exited loop with leader being %d\n", rank, leader_rank);
MPI_Barrier(comm);
if (rank == leader_rank) {
printf("Leader sending done to coordinator\n");
MPI_Send(&rank, 1, MPI_INT, coordinator_rank, LELECT_GS_DONE, MPI_COMM_WORLD);
}
}