使用 Grpc 实现最少连接负载平衡

问题描述 投票:0回答:1
least_connection.proto code
Node overloaded -- starting load balancing process
Traceback (most recent call last):
File "D:\lab7p2\least connection\node2.py", line 73, in <module>
node.check_load()
File "D:\lab7p2\least connection\node2.py", line 46, in check_load
self.balance_load()
File "D:\lab7p2\least connection\node2.py", line 36, in balance_load
request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: Message must be initialized with a dict: least_connection.LoadTransferMessage

node2.py代码

import threading
import time
import grpc
import least_connection_pb2 as least_connection_pb2
import least_connection_pb2_grpc as least_connection_pb2_grpc
from concurrent import futures

class Node(least_connection_pb2_grpc.LeastConnectionLoadBalancerServicer):
    def __init__(self):
        self.node_id = '2'
        self.server_address = 'localhost:50052'
        self.connections = 85  # Initial number of connections
        self.threshold = 80    # Threshold for load balancing
        self.servers = [('localhost:50051', 70), ('localhost:50053', 65)]  # List of tuples (server address, connections)

    def get_least_connections_server(self):
        least_connections_server = min(self.servers, key=lambda server: server[1], default=None)
        return least_connections_server

    def balance_load(self):
        least_connections_server = self.get_least_connections_server()
        if least_connections_server is None:
            print("No server available for load transfer")
            return

        least_connections_address, least_connections_count = least_connections_server

        transfer_load = self.connections - self.threshold
        if transfer_load <= 0:
            print("No load to transfer")
            return

        channel = grpc.insecure_channel(self.server_address)
        stub = least_connection_pb2_grpc.LeastConnectionLoadBalancerStub(channel)

        request = least_connection_pb2.LoadTransferMessage(server=least_connections_address, load=int(transfer_load))
        response = stub.TransferLoad(request)
        if response.success:
            print(f"Node {self.node_id} --> {transfer_load} Units --> Server {least_connections_address}")
            self.connections -= transfer_load

    def check_load(self):
        print(f"Current node load: {self.connections}")
        if self.connections > self.threshold:
            print("Node overloaded -- starting load balancing process")
            self.balance_load()

    def getServerWithLeastConnections(self, request, context):
        least_connections_server = self.get_least_connections_server()
        server_obj = least_connection_pb2.Server(name=least_connections_server[0], current_connections=least_connections_server[1])
        return least_connection_pb2.LeastConnectionServer(server=server_obj, connections=least_connections_server[1])

def serve(node):
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    least_connection_pb2_grpc.add_LeastConnectionLoadBalancerServicer_to_server(node, server)
    server.add_insecure_port('localhost:50052')
    server.start()
    server.wait_for_termination()

if __name__ == "__main__":
    node = Node()
    server_thread = threading.Thread(target=serve, args=(node,))
    server_thread.start()

    # Wait for a moment to ensure the server is up and running
    time.sleep(1)

    try:
        while True:
            user_input = input("Enter 1 to check status: ")

            if user_input == "1":
                node.check_load()
            print("--------------------------------------------")
    except KeyboardInterrupt:
        pass

类似node1,node3代码

原始代码

syntax = "proto3";

package least_connection;

message Server {
  string name = 1;
  int32 current_connections = 2;
}

message RequestData {
  string request_id = 1;
}

message LeastConnectionServer {
  Server server = 1;
  int32 connections = 2;
}

message LoadTransferMessage {
  Server server = 1;
  int32 load = 2;
}

service LeastConnectionLoadBalancer {
  rpc getServerWithLeastConnections(RequestData) returns (LeastConnectionServer);
  rpc TransferLoad(LoadTransferMessage) returns (LoadTransferResponse);
}

message LoadTransferResponse {
  bool success = 1;
}

python protocol-buffers grpc load-balancing distributed-computing
1个回答
0
投票

您的

Node
类有一个实例属性
server
定义为:

self.servers = [('localhost:50051', 70), ('localhost:50053', 65)]

其类型为

List
元组 (
str
,
int
)

所以,在

balance_load
行之后:

least_connections_server = self.get_least_connections_server()

least_connections_server
包含一个字符串元组,int

然后打开包装:

least_connections_address, least_connections_count = least_connections_server

least_connections_address
(
str
) 和
least_connections_count
(
int
)。

从你的原型中,

LoadTransferMessage
采用
server
类型的
Server
load
类型的
int

request = least_connection_pb2.LoadTransferMessage(
  server=least_connections_address,
  load=int(transfer_load),
)

但是

least_connections_address
str
不是
Server

你想要(类似的东西):

server = least_connection_pb2.Server(
  name=least_connections_address,
  current_connections=least_connections_count,
)

request = least_connection_pb2.LoadTransferMessage(
  server=server,
  load=int(transfer_load),
)
© www.soinside.com 2019 - 2024. All rights reserved.