下面是双向流的原型
service RouteGuide {
// A Bidirectional streaming RPC.
//
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
Go 定义了一个接口,用于在从服务器到客户端的双向流中发送和接收消息,反之亦然:go 接口。但我在 Python 中找不到相同的功能,可以在流内发送响应而不进行额外的
RouteChat
调用。
提供的示例也没有讨论从 client
发回请求我们如何在同一流中将来自 python grpc 客户端的请求从客户端发送回服务器?
def generate_messages():
messages = [
make_route_note("First message", 0, 0),
make_route_note("Second message", 0, 1),
make_route_note("Third message", 1, 0),
make_route_note("Fourth message", 0, 0),
make_route_note("Fifth message", 1, 0),
]
for msg in messages:
print("Sending %s at %s" % (msg.message, msg.location))
yield msg
def guide_route_chat(stub):
responses = stub.RouteChat(generate_messages())
for response in responses:
print("Received message %s at %s" % (response.message, response.location))
对于以下原型:
syntax = "proto3";
package Streaming;
service Streaming{
rpc RpcFunction(stream Request) returns (stream Response) {}
}
message Request {
oneof stream_input {
bytes data = 1;
}
}
message Response {
oneof stream_output {
bytes data = 1;
}
}
您可以使用以下函数异步发送接收消息:
async with grpc.aio.insecure_channel("0.0.0.0:8001",
options=[ ('grpc.max_send_message_length', MAX_MESSAGE_LENGTH),
('grpc.max_receive_message_length', MAX_MESSAGE_LENGTH)]
) as channel:
try:
await asyncio.wait_for(channel.channel_ready(), timeout=15)
except asyncio.TimeoutError:
sys.exit('Error connecting to server')
stub = streaming_pb2_grpc.StreamingStub(channel)
stream = stub.RpcFunction()
for itr in range(0, len(msg)):
request = streaming_pb2.Request(data=msg[itr].tobytes())
await stream.write(request)
response = await stream.read()
print(response.data.decode("utf-8"))
您可以使用相同的流对象在双向 gRPC 中发送和接收消息。要将消息从客户端发送回服务器,您只需使用要发送的消息调用流对象上的
stream.write()
方法即可。
您可以将您的此功能更改为这样:
def guide_route_chat(stub):
stream = stub.RouteChat(generate_messages())
for response in stream:
print("Received message %s at %s" % (response.message, response.location))
# Send a message back to the server
request = RouteNote()
request.message = "Sending message back to server."
request.location.latitude = 37.7749
request.location.longitude = -122.4194
stream.write(request)
请务必记住,流需要处于活动状态才能发回消息。您可以通过
stream.isactive()
查看