如何使用 GRPC c++ 读取异步服务器端流

问题描述 投票:0回答:1

我正在尝试使用 C++ 实现异步服务器端流。但我找不到任何好的例子。我很难异步读取流。

服务器代码

class ServerImpl final
{
public:
    ~ServerImpl()
    {
        server_->Shutdown();
        cq_->Shutdown();
    }

    void Run()
    {
        std::string server_address("0.0.0.0:50051");

        ServerBuilder builder;
        builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
        builder.RegisterService(&service_);

        cq_ = builder.AddCompletionQueue();
        server_ = builder.BuildAndStart();
        std::cout << "Server listening on " << server_address << std::endl;

        HandleRpcs();
    }

private:
    class CallData
    {
    public:
        CallData(MultiGreeter::AsyncService* service, ServerCompletionQueue* cq)
            : service_(service)
            , cq_(cq)
            , responder_(&ctx_)
            , status_(CREATE)
            , times_(0)
        {
            Proceed();
        }
      void Proceed()
        {
            if (status_ == CREATE)
            {
                status_ = PROCESS;
                service_->RequestsayHello(&ctx_, &request_, &responder_, cq_, cq_, this);
            }
            else if (status_ == PROCESS)
            {                   
                if (times_ == 0)
                {
                    new CallData(service_, cq_);
                }    
                if (times_++ >= 3)
                {
                    status_ = FINISH;
                    responder_.Finish(Status::OK, this);
                }
                else
                {
                    std::string prefix("Hello ");
                    reply_.set_message(prefix + request_.name() + ", no " + request_.num_greetings());    
                    responder_.Write(reply_, this);
                }
            }
            else
            {
                GPR_ASSERT(status_ == FINISH);
                delete this;
            }
        }    
    private:
        MultiGreeter::AsyncService* service_;
        ServerCompletionQueue* cq_;
        ServerContext ctx_;  
        HelloRequest request_;
        HelloReply reply_; 
        ServerAsyncWriter<HelloReply> responder_;    
        int times_;   
        enum CallStatus
        {
            CREATE,
            PROCESS,
            FINISH
        };
        CallStatus status_;
    };

    void HandleRpcs()
    {
        new CallData(&service_, cq_.get());
        void* tag; 
        bool ok;
        while (true)
        {
            GPR_ASSERT(cq_->Next(&tag, &ok));
            GPR_ASSERT(ok);
            static_cast<CallData*>(tag)->Proceed();
        }
    }
    std::unique_ptr<ServerCompletionQueue> cq_;
    MultiGreeter::AsyncService service_;
    std::unique_ptr<Server> server_;
};

客户代码

class GreeterClient
{
public:
    GreeterClient(std::shared_ptr<Channel> channel)
        : stub_(MultiGreeter::NewStub(channel))
    {}
    void SayHello(const std::string& user, const std::string& num_greetings)
    {
        HelloRequest request;
        request.set_name(user);
        request.set_num_greetings(num_greetings);
        ClientContext context;
        CompletionQueue cq;
        void* got_tag = (void*)1;
        bool ok = false;
        std::unique_ptr<ClientAsyncReader<HelloReply>>       reader(stub_>PrepareAsyncSayHello(&context,request, &cq));    
        std::cout << "Got reply: " << reply.message() << std::endl;          
        reader->Read(&reply,got_tag);
        Status status;
        reader->Finish(&status, (void*)1);
        GPR_ASSERT(cq.Next(&got_tag, &ok));
        GPR_ASSERT(ok);   
        if (status.ok()) 
        {
            std::cout << "sayHello rpc succeeded." << std::endl;
        } 
        else 
        {
            std::cout << "sayHello rpc failed." << std::endl;
            std::cout << status.error_code() << ": " << status.error_message() << std::endl;
        }
    }
private:
    std::unique_ptr<MultiGreeter::Stub> stub_;
};

我的原型服务

rpc SayHello (HelloRequest) returns (stream HelloReply) {}

当我使用 ClientReader 读取流同步方式时,它工作正常。但是当我使用 ClientAsyncReader 时,我收到此运行时错误。

Assertion failed: (started_), function Finish, file /usr/local/include/grpcpp/impl/codegen/async_stream_impl.h, line 250.
23:25:40: The program has unexpectedly finished.

提前致谢。

c++ asynchronous streaming grpc
1个回答
0
投票

错误消息

Assertion failed: (started_), function Finish,
指出在使用
Finish
初始化异步读取操作之前调用
ClientAsyncReader
Read
函数的问题。这会破坏异步操作的预期顺序。

问题是在

Finish
操作完成之前就使用了
Read
函数。使用
ClientAsyncReader
时,应先执行
Read
,等待完成,然后再执行
Finish

在你的情况下尝试这样的事情:

...
reader->Read(&reply, got_tag);

// Wait for the read operation to complete
void* got_tag_read;
bool ok_read;
GPR_ASSERT(cq.Next(&got_tag_read, &ok_read));
GPR_ASSERT(ok_read);

Status status;
reader->Finish(&status, (void*)1);

// Wait for the Finish operation to complete
void* got_tag_finish;
bool ok_finish;
GPR_ASSERT(cq.Next(&got_tag_finish, &ok_finish));
GPR_ASSERT(ok_finish);
...
© www.soinside.com 2019 - 2024. All rights reserved.