服务器端事件C++实现?

问题描述 投票:0回答:2

我正在尝试实现一个 C++ 服务器来为 javascript EventSource 生成事件,我正在使用 cpprest 构建它。从我在 PHP 或 Node.js 中看到的示例来看,它看起来非常简单,但我一定错过了一些东西,因为我在 Firefox 控制台中得到了这个:

Firefox can’t establish a connection to the server at http://localhost:32123/data.

使用邮递员,我正确接收了

"data : test"
,所以我认为我错过了一些延续,可能需要做的不仅仅是回复请求,但我没有找到关于这应该如何进行的良好解释还没工作呢。如果您有一些文档可以指出我,我将不胜感激!

html页面脚本如下所示:

var source = new EventSource("http://localhost:32123/data");

source.onmessage = function (event) {
    document.getElementById("result1").innerHTML += event.data + "<br>";
};

C++ 服务器响应:

wResponse.set_status_code(status_codes::OK);
wResponse.headers().add(U("Access-Control-Allow-Origin"), U("*"));
wResponse.set_body(U("data: test"));
iRequest.reply(wResponse);

我的服务器收到的请求:

GET /data HTTP/1.1
Accept: text/event-stream
Accept-Encoding: gzip, deflate
Accept-Language: en-us, en;q=0.5
Cache-Control: no-cache
Connection: keep-alive
Host: localhost:32123
Origin: null
Pragma: no-cache
User-Agent: Mozilla/5.0 (Windows NT6.1; Win64, x64; rv:61.0) Gecko/20100101 Firefox/61.0
c++ server-side eventsource cpprest-sdk
2个回答
1
投票

在这里找到解决方案

这是一个小证明。它一点也不完美,但它正在发挥作用。下一步是弄清楚如何存储连接,检查它们是否处于活动状态,等等......

编辑:在达伦评论后更新答案

正确的解决方案似乎是围绕将

producer_consumer_buffer<char>
喂给绑定到设置为
basic_istream<uint8_t>
主体的
http_response
来展开。

然后,一旦

http_request::reply
完成,连接将保持打开状态,直到缓冲区关闭,这可以通过
wBuffer.close(std::ios_base::out).wait();
完成。

我不是 100% 确定,但似乎

wBuffer.sync().wait();
的作用就像 PHP
flush
命令将在类似的 event-providing-server 场景中使用。

下面添加了一个工作示例。

显然这不是一个完整的解决方案。管理连接等方面还有更多乐趣。用

Connection
实例化一些
make_unique
并将它们存储到事件访问的容器中可能是我的方法......

main.cpp

#include "cpprest/uri.h"
#include "cpprest/producerconsumerstream.h"
#include "cpprest/http_listener.h"

using namespace std;
using namespace web;
using namespace http;
using namespace utility;
using namespace concurrency;
using namespace http::experimental::listener;

struct MyServer
{
  MyServer(string_t url);
  pplx::task<void> open()  { return mListener.open(); };
  pplx::task<void> close() { return mListener.close(); };

private:

  void handleGet(http_request iRequest);
  http_listener mListener;
};

MyServer::MyServer(utility::string_t url) : mListener(url)
{
  mListener.support(methods::GET, bind(&MyServer::handleGet, this, placeholders::_1));
}

void MyServer::handleGet(http_request iRequest)
{
  ucout << iRequest.to_string() << endl;

  http_response wResponse;

  // Setting headers
  wResponse.set_status_code(status_codes::OK);
  wResponse.headers().add(header_names::access_control_allow_origin, U("*"));
  wResponse.headers().add(header_names::content_type, U("text/event-stream"));

  // Preparing buffer
  streams::producer_consumer_buffer<char> wBuffer;
  streams::basic_istream<uint8_t> wStream(wBuffer);
  wResponse.set_body(wStream);

  auto wReplyTask = iRequest.reply(wResponse);

  wBuffer.putn_nocopy("data: a\n",10).wait();
  wBuffer.putn_nocopy("data: b\n\n",12).wait();
  wBuffer.sync().wait();  // seems equivalent to 'flush'

  this_thread::sleep_for(chrono::milliseconds(2000));

  wBuffer.putn_nocopy("data: c\n", 10).wait();
  wBuffer.putn_nocopy("data: d\n\n", 12).wait();
  wBuffer.sync().wait();
  // wBuffer.close(std::ios_base::out).wait();    // closes the connection
  wReplyTask.wait();      // blocking!
}

unique_ptr<MyServer> gHttp;

void onInit(const string_t iAddress)
{
  uri_builder wUri(iAddress);
  auto wAddress = wUri.to_uri().to_string();
  gHttp = unique_ptr<MyServer>(new MyServer(wAddress));

  gHttp->open().wait();
  ucout << string_t(U("Listening for requests at: ")) << wAddress << endl;

}

void onShutdown()
{
  gHttp->close().wait();
}


void main(int argc, wchar_t* argv[])
{

  onInit(U("http://*:32123"));

  cout << "Wait until connection occurs..." << endl;
  getchar();

  onShutdown();
}

sse.htm

<!DOCTYPE html>
<html lang="en-US">
<head>
<meta charset="utf-8">
</head>
    <body>
        <div id="result"></div>
    </body>
</html>

<script>

    if (typeof (EventSource) !== undefined)
    {
        document.getElementById("result").innerHTML += "SSE supported" + "<br>";
    } 
    else
    {
        document.getElementById("result").innerHTML += "SSE NOT supported" + "<br>";
    }

    var source = new EventSource("http://localhost:32123/");

    source.onopen = function ()
    {
        document.getElementById("result").innerHTML += "open" + "<br>";
    };

    source.onerror = function ()
    {        
        document.getElementById("result").innerHTML += "error" + "<br>";
    };

    source.onmessage = function (event) {
        document.getElementById("result").innerHTML += event.data + "<br>";
    };

</script>

0
投票

仅供参考。有一种基于brpc的解决方案:https://github.com/apache/brpc/pull/2375

© www.soinside.com 2019 - 2024. All rights reserved.