无根生成树中的异步领导者选举宣布多个获胜者

问题描述 投票:0回答:1

我正在尝试使用 MPI 实现图像中描述的算法。这是大学项目的一部分,我们正在该项目中构建分布式卫星到地面站通信系统。我已经对其进行了多次迭代,但当选出多个候选获胜者时,我永远无法打破平局。

async leader election algorithm

这是处理选举过程的函数。每个地面站进程都会调用此函数(其通信组中有 5 个,GS 0、1、2、3 和 4),并通过来自协调器进程(排名 10)的

START_LELECT_GS
事件启动。

void perform_gs_leader_election(int coordinator_rank, int rank, MPI_Comm comm) {
    MPI_Status status;
    int is_leader = 0;
    int received_count = 0;
    int neighbor_received_from[num_of_neighbors];

    printf("Process %d: Starting leader election\n", rank);

    // Determine if the process is a leaf node
    const int is_leaf = (num_of_neighbors == 1);
    printf("Process %d: Is leaf? %d\n", rank, is_leaf);

    // Send initial <ELECT> messages only if the process is a leaf node
    if (is_leaf) {
        MPI_Send(&rank, 1, MPI_INT, neighbor_gs[0], ELECT, comm);
        printf("Leaf Process %d: Sent ELECT to %d\n", rank, neighbor_gs[0]);
    }

    // Receive and process messages from neighbors
    while (1) {
        int sender_rank;
        MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);
        printf("Process %d: Received message from %d with tag %d\n", rank, status.MPI_SOURCE, status.MPI_TAG);

        if (status.MPI_TAG == ELECT) {
            received_count++;
            neighbor_received_from[status.MPI_SOURCE] = 1;
            printf("Process %d: Received ELECT from %d\n", rank, status.MPI_SOURCE);

            // If all neighbors except one have sent ELECT messages, probe for incoming messages before sending ELECT to the remaining neighbor
            if (received_count == num_of_neighbors - 1) {
                int flag;
                MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &flag, &status);

                if (flag && status.MPI_TAG == TERMINATE_LELECT_GS) {
                    printf("Process %d: Received TERMINATE_LELECT_GS from %d\n", rank, status.MPI_SOURCE);

                    if (status.MPI_SOURCE > rank) {
                        printf("Process %d: Surrendering leadership to process %d\n", rank, status.MPI_SOURCE);
                        MPI_Send(&rank, 1, MPI_INT, status.MPI_SOURCE, GS_LEADER, comm);
                        is_leader = 0;
                    } else {
                        printf("Process %d: Declaring leadership over process %d\n", rank, status.MPI_SOURCE);
                        is_leader = 1;
                    }

                    // send out terminate to all neighbors but the sender
                    for (int i = 0; i < num_of_neighbors; i++) {
                        if (neighbor_gs[i] != status.MPI_SOURCE) {
                            MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                            printf("Process %d: Forwarded TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
                        }
                    }
                    break;
                }

                if (!flag) {
                    // if we got elect from all but one, send elect to remaining neighbor
                    for (int i = 0; i < num_of_neighbors; i++) {
                        if (!neighbor_received_from[neighbor_gs[i]]) {
                            MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], ELECT, comm);
                            printf("Process %d: Sent ELECT to remaining neighbor %d\n", rank, neighbor_gs[i]);
                            break;
                        }
                    }
                }
            }

            // If all neighbors have sent ELECT messages, this process is a candidate for the leader
            if (received_count == num_of_neighbors) {
                printf("Process %d: Candidate for leader\n", rank);
                is_leader = 1;
                // send terminate to all neighbors
                for (int i = 0; i < num_of_neighbors; i++) {
                    MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                    printf("Process %d: Sent TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
                }
                break;
            }
        } else if (status.MPI_TAG == TERMINATE_LELECT_GS) {
            printf("Process %d: Received TERMINATE_LELECT_GS from %d\n", rank, status.MPI_SOURCE);

            for (int i = 0; i < num_of_neighbors; i++) {
                if (neighbor_gs[i] != status.MPI_SOURCE) {
                    MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                    printf("Process %d: Forwarded TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
                }
            }
            break;
        } else if (status.MPI_TAG == GS_LEADER) {
            printf("Process %d: Declared leader due to GS_LEADER message\n", rank);
            is_leader = 1;
            for (int i = 0; i < num_of_neighbors; i++) {
                if (neighbor_gs[i] != status.MPI_SOURCE) {
                    MPI_Send(&rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                    printf("Process %d: Sent TERMINATE_LELECT_GS to %d\n", rank, neighbor_gs[i]);
                }
            }
            break;
        }
    }

    MPI_Barrier(comm);
    printf("Process %d: Terminated\n", rank);
    if (is_leader) {
        printf("Process %d: Sending LELECT_GS_DONE to coordinator\n", rank);
        MPI_Send(&rank, 1, MPI_INT, coordinator_rank, LELECT_GS_DONE, MPI_COMM_WORLD);
    }
}

这是有问题的输出的示例:

parsing START_LELECT_GS event
Process 0: Starting leader election
Process 0: Is leaf? 0
Process 1: Starting leader election
Process 1: Is leaf? 0
Process 4: Starting leader election
Process 4: Is leaf? 1
Leaf Process 4: Sent ELECT to 0
Process 2: Starting leader election
Process 3: Starting leader election
Process 3: Is leaf? 1
Leaf Process 3: Sent ELECT to 1
Process 2: Is leaf? 1
Leaf Process 2: Sent ELECT to 1
Process 0: Received message from 4 with tag 22
Process 0: Received ELECT from 4
Process 0: Sent ELECT to remaining neighbor 1
Process 1: Received message from 3 with tag 22
Process 1: Received ELECT from 3
Process 1: Received message from 2 with tag 22
Process 1: Received ELECT from 2
Process 1: Sent ELECT to remaining neighbor 0
Process 1: Received message from 0 with tag 22
Process 1: Received ELECT from 0
Process 1: Candidate for leader
Process 1: Sent TERMINATE_LELECT_GS to 3
Process 1: Sent TERMINATE_LELECT_GS to 0
Process 1: Sent TERMINATE_LELECT_GS to 2
Process 0: Received message from 1 with tag 22
Process 0: Received ELECT from 1
Process 0: Candidate for leader
Process 0: Sent TERMINATE_LELECT_GS to 1
Process 0: Sent TERMINATE_LELECT_GS to 4
Process 4: Received message from 0 with tag 24
Process 4: Received TERMINATE_LELECT_GS from 0
Process 3: Received message from 1 with tag 24
Process 3: Received TERMINATE_LELECT_GS from 1
Process 2: Received message from 1 with tag 24
Process 2: Received TERMINATE_LELECT_GS from 1
Process 0: Terminated
Process 0: Sending LELECT_GS_DONE to coordinator
Process 3: Terminated
Process 4: Terminated
Process 1: Terminated
Process 1: Sending LELECT_GS_DONE to coordinator
process 10 waiting at barrier
Process 2: Terminated

出现了两个候选者,但尽管我试图探测中间终止消息以便我可以尽早打破平局,但它不起作用,但我无法弄清楚原因。

c mpi distributed-computing
1个回答
0
投票

我确实设法实现了上述算法,现在似乎可以正常工作。如果有人感兴趣的话,这是代码。

附注如果您发现任何错误,请告诉我!

void perform_gs_leader_election(int coordinator_rank, int rank, MPI_Comm comm) {
    MPI_Status status;
    int leader_rank = -1;
    int received_count = 0;
    int neighbor_received_from[num_of_neighbors];
    for (int i = 0; i < num_of_neighbors; i++) neighbor_received_from[i] = 0;
    int remaining_neighbor = -1;

    printf("Process %d: Starting leader election\n", rank);

    // Determine if the process is a leaf node
    const int is_leaf = (num_of_neighbors == 1);
    printf("Process %d: Is leaf? %d\n", rank, is_leaf);

    // Send initial <ELECT> messages only if the process is a leaf node
    if (is_leaf) {
        MPI_Send(&rank, 1, MPI_INT, neighbor_gs[0], ELECT, comm);
        printf("Leaf Process %d: Sent ELECT to %d\n", rank, neighbor_gs[0]);
    }

    do {
        int sender_rank;
        while (received_count < num_of_neighbors - 1) {
            MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);

            if (status.MPI_TAG == ELECT) {
                for (int i = 0; i < num_of_neighbors; i++) {
                    if (neighbor_gs[i] == status.MPI_SOURCE) {
                        neighbor_received_from[i] = 1;
                    }
                }
                received_count++;
                printf("Process %d got ELECT from %d (replies = %d)\n", rank, status.MPI_SOURCE, received_count);
                if (received_count == num_of_neighbors - 1) break;
            }

            if (status.MPI_TAG == TERMINATE_LELECT_GS) {
                // leader has been found, terminate and notify all neighbors by sending leader rank
                leader_rank = sender_rank;
                printf("Process %d got TERMINATE from %d with leader %d\n", rank, status.MPI_SOURCE, sender_rank);
                for (int i = 0; i < num_of_neighbors; i++) {
                    printf("Process %d propagating leader to process %d\n", rank, neighbor_gs[i]);
                    if(neighbor_gs[i] != status.MPI_SOURCE) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[i], TERMINATE_LELECT_GS, comm);
                }
                break;
            }
        }

        if (leader_rank != -1) break; // got terminate while still sending elects, terminate early

        printf("iterating for remaining neighbor\n");
        // we have received replies from all neighbors but one
        for (int i = 0; i < num_of_neighbors; i++) {
            printf("considering %d with found %d\n", neighbor_gs[i], neighbor_received_from[i]);
            if (!neighbor_received_from[i]) {
                printf("remaining neighbor is %d\n", neighbor_gs[i]);
                remaining_neighbor = neighbor_gs[i];
                break;
            }
        }

        if (remaining_neighbor != -1 && !is_leaf) {
            // if we are a leaf, don't resend
            printf("Process %d has received all replies but one (from %d)...\n", rank, remaining_neighbor);
            int flag;
            // probe to see if the remaining neighbor is sending an ELECT before we send it one
            MPI_Iprobe(remaining_neighbor, ELECT, comm, &flag, &status);

            if (flag) {
                MPI_Recv(&sender_rank, 1, MPI_INT, remaining_neighbor, ELECT, comm, &status);

                // we got our last elect response, so we are the leader
                printf("Process %d is leader because it received ELECT from remaining neighbor\n", rank);
                // note: could update count and received neighbors but no need
                leader_rank = rank;
                // send out our rank to all neighbors
                for (int j = 0; j < num_of_neighbors; j++) {
                    printf("Process %d propagating winner (us) to %d\n", rank, neighbor_gs[j]);
                    MPI_Send(&rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
                }
                break;
            } else {
                // else we send to the last neighbor
                printf("Process %d, no incoming last elect from probe, sending elect to last neighbor\n", rank);
                MPI_Send(&rank, 1, MPI_INT, remaining_neighbor, ELECT, comm);
            }
        }

        // have sent to all neighbors, wait for outcome
        printf("Process %d has sent all elects, now waiting for final event\n", rank);
        MPI_Recv(&sender_rank, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, comm, &status);

        if (status.MPI_TAG == ELECT && status.MPI_SOURCE == remaining_neighbor) {
            // probe for terminate in case someone else was elected after the elect we received was sent
            int flag = 0;
            MPI_Iprobe(MPI_ANY_SOURCE, TERMINATE_LELECT_GS, comm, &flag, &status);

            if (flag) {
                // another process was declared leader faster, abort
                MPI_Recv(&leader_rank, 1, MPI_INT, MPI_ANY_SOURCE, TERMINATE_LELECT_GS, comm, &status);
                printf("Process %d got terminate from %d because another process won first!\n", rank,
                       status.MPI_SOURCE);
                for (int j = 0; j < num_of_neighbors; j++) {
                    printf("Process %d sending terminate to %d\n", rank, neighbor_gs[j]);
                    if(neighbor_gs[j] != status.MPI_SOURCE) MPI_Send(&sender_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
                }
                break;
            }


            // means we got our last elect but after we sent out an elect to the remaining neighbor
            // incoming elect right after we sent elect the same way, so contest
            if (rank > sender_rank) {
                printf("Process %d got elect on same edge from %d and won\n", rank, sender_rank);
                // we win the fight and are the leader
                leader_rank = rank;
                for (int j = 0; j < num_of_neighbors; j++) {
                    printf("Process %d propagating ourself (winner) to %d\n", rank, neighbor_gs[j]);
                    if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
                }
                break;
            } else {
                // sender has won and is leader
                printf("Process %d got elect on same edge and lost fight. winner was %d\n", rank,
                       status.MPI_SOURCE);
                leader_rank = status.MPI_SOURCE;
                for (int j = 0; j < num_of_neighbors; j++) {
                    printf("Process %d propagating other process/winner to %d\n", rank, neighbor_gs[j]);
                    if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
                }
                break;
            }
        } else if (status.MPI_TAG == TERMINATE_LELECT_GS) {
            // we got terminate signal, notify neighbors
            leader_rank = sender_rank;
            printf("Process %d got terminate from %d with leader being %d!\n", rank, status.MPI_SOURCE, leader_rank);
            for (int j = 0; j < num_of_neighbors; j++) {
                if(neighbor_gs[j] != remaining_neighbor) MPI_Send(&leader_rank, 1, MPI_INT, neighbor_gs[j], TERMINATE_LELECT_GS, comm);
            }
            break;
        }
    } while (1);
    printf("Process %d exited loop with leader being %d\n", rank, leader_rank);
    MPI_Barrier(comm);
    if (rank == leader_rank) {
        printf("Leader sending done to coordinator\n");
        MPI_Send(&rank, 1, MPI_INT, coordinator_rank, LELECT_GS_DONE, MPI_COMM_WORLD);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.