如何将 boost asio tcp socket 传递给线程,以便向客户端或服务器发送心跳。

问题描述 投票:0回答:1

我正在用boost TCP编写一个客户服务器程序,其中我想每2秒向客户机发送一个HEARTBEAT消息,我正试图创建一个新的线程,以便于发送,但无法解决这个问题。我正在使用以下方法创建线程 boost::thread t(hearbeatSender,sock); 但却出现了很多错误,我也用bind来绑定函数名和socket,但没有解决这个错误。我也用bind将函数名与socket绑定,但没有解决这个错误。

void process(boost::asio::ip::tcp::socket & sock);
std::string read_data(boost::asio::ip::tcp::socket & sock);
void write_data(boost::asio::ip::tcp::socket & sock,std::string);
void hearbeatSender(boost::asio::ip::tcp::socket & sock);
int main()
{

    unsigned short port_num = 3333;
    boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v4::any(), port_num);
    boost::asio::io_service io;
    try
    {
        boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
        acceptor.bind(ep);
        acceptor.listen();
        boost::asio::ip::tcp::socket sock(io);
        acceptor.accept(sock);
        boost::thread t(hearbeatSender,sock); 
        process(sock);
        t.join();

    }
    catch (boost::system::system_error &e)
    {
        std::cout << "Error occured! Error code = " << e.code()
        << ". Message: " << e.what();

        return e.code().value();
    }
  return 0;

}
void process(boost::asio::ip::tcp::socket & sock)
{
    while(1){
    std::string data = read_data(sock);
    std::cout<<"Client's request is: "<<data<<std::endl;
    write_data(sock,data);
    }
}
std::string read_data(boost::asio::ip::tcp::socket & sock)
{
    boost::asio::streambuf buf;
    boost::asio::read_until(sock, buf, "\n");
    std::string data = boost::asio::buffer_cast<const char*>(buf.data());
    return data;
}
void write_data(boost::asio::ip::tcp::socket & sock,std::string data)
{
    boost::system::error_code error;
    std::string msg;
    int ch = data[0]-'0';
    switch(ch)
    {
        case 1: msg = "Case 1\n"; break;
        case 2: msg = "Case 2\n"; break;
        case 3: msg = "Case 3\n"; break;
        case 4: msg = "Case 4\n"; break;
        default: msg  = "Case default\n"; break;
    }
    boost::asio::write( sock, boost::asio::buffer(msg+ "\n"), error );
     if( !error ) {
        std::cout << "Server sent hello message!" << std::endl;
     }
     else {
        std::cout << "send failed: " << error.message() << std::endl;
     }
}
void hearbeatSender(boost::asio::ip::tcp::socket & sock)
{
    boost::system::error_code error;
    std::string msg = "HEARTBEAT";
    while(1)
    {
        sleep(2);
        std::cout<<msg<<std::endl;
        boost::asio::write( sock, boost::asio::buffer(msg+ "\n"), error );
        if( !error ) {
        std::cout << "Server sent HEARTBEAT message!" << std::endl;
        }
        else {
            std::cout << "send failed: " << error.message() << std::endl;
        }
    }
}

这是一个服务器端的代码,用于响应客户端的消息并向客户端发送心跳。这是一个同步的TCP服务器。

c++ sockets boost tcp boost-asio
1个回答
1
投票

而不是这样。

    boost::asio::ip::tcp::socket sock(io);
    acceptor.accept(sock);
    boost::thread t(hearbeatSender,sock); 

用这个来代替:

    auto sock = acceptor.accept();
    std::thread t([&sock]() {
        hearbeatSender(sock);
    });

取而代之的是 sleep,只是使用std::this_thread::sleep进行普遍编译。

下面是编译和运行的完整程序。

#include <boost/asio.hpp>
#include <iostream>


void process(boost::asio::ip::tcp::socket& sock);
std::string read_data(boost::asio::ip::tcp::socket& sock);
void write_data(boost::asio::ip::tcp::socket& sock, std::string);
void hearbeatSender(boost::asio::ip::tcp::socket& sock);
int main()
{

    unsigned short port_num = 3333;
    boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v4::any(), port_num);
    boost::asio::io_service io;
    try
    {
        boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
        acceptor.bind(ep);
        acceptor.listen();
        auto sock = acceptor.accept();
        std::thread t([&sock]() {
            hearbeatSender(sock);
        });
        process(sock);
        t.join();

    }
    catch (boost::system::system_error& e)
    {
        std::cout << "Error occured! Error code = " << e.code()
            << ". Message: " << e.what();

        return e.code().value();
    }
    return 0;

}
void process(boost::asio::ip::tcp::socket& sock)
{
    while (1) {
        std::string data = read_data(sock);
        std::cout << "Client's request is: " << data << std::endl;
        write_data(sock, data);
    }
}
std::string read_data(boost::asio::ip::tcp::socket& sock)
{
    boost::asio::streambuf buf;
    boost::asio::read_until(sock, buf, "\n");
    std::string data = boost::asio::buffer_cast<const char*>(buf.data());
    return data;
}
void write_data(boost::asio::ip::tcp::socket& sock, std::string data)
{
    boost::system::error_code error;
    std::string msg;
    int ch = data[0] - '0';
    switch (ch)
    {
    case 1: msg = "Case 1\n"; break;
    case 2: msg = "Case 2\n"; break;
    case 3: msg = "Case 3\n"; break;
    case 4: msg = "Case 4\n"; break;
    default: msg = "Case default\n"; break;
    }
    boost::asio::write(sock, boost::asio::buffer(msg + "\n"), error);
    if (!error) {
        std::cout << "Server sent hello message!" << std::endl;
    }
    else {
        std::cout << "send failed: " << error.message() << std::endl;
    }
}
void hearbeatSender(boost::asio::ip::tcp::socket& sock)
{
    boost::system::error_code error;
    std::string msg = "HEARTBEAT";
    while (1)
    {
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::cout << msg << std::endl;
        boost::asio::write(sock, boost::asio::buffer(msg + "\n"), error);
        if (!error) {
            std::cout << "Server sent HEARTBEAT message!" << std::endl;
        }
        else {
            std::cout << "send failed: " << error.message() << std::endl;
        }
    }
}

0
投票

在异步IO的情况下使用心跳... "sender "线程与async IO。

更重要的是,socket对象上没有同步,所以这是一个 数据竞赛 也就是 未定义的行为.

最后,这是不安全的。

    std::string data = boost::asio::buffer_cast<const char*>(buf.data());

它假设data()是以NUL-结束的(这不是真的)。

典型的单线程ASIO

你不会为定时器生成线程,而是使用例如 boost::asio::deadline_timerboost::asio::highresolution_timer. 它可以异步等待,所以你可以在IO服务上做其他任务,直到它到期。

同样,你也可以异步地进行requestresponse的读写。唯一的 "复杂 "因素是异步调用不会在返回之前完成,所以你必须确保缓冲区的寿命足够长(它们不应该是一个本地变量)。

现在,你已经有了一个逻辑上的寿命 "单位",这个单位几乎是从代码中跳出来的。

enter image description here

这简直就是在叫嚣着要重写成:

struct LifeTimeUnit {
    boost::asio::ip::tcp::socket sock;

    void process();
    std::string read_data();
    void write_data(std::string);
    void hearbeatSender(sock);
};

当然了 LifeTimeUnit 是一个有趣的名字,所以让我们想一个更好的名字。Session seems meaningful!


现在我们有了一个 lifetime 的单位,它可以帅气地包含其他东西,比如缓冲区和计时器。

struct Session {
    Session(tcp::socket&& s) : sock(std::move(s)) {}

    void start() {
        hb_wait();
        req_loop();
    }

    void cancel() {
        hbtimer.cancel();
        sock.cancel(); // or shutdown() e.g.
    }

  private:
    bool checked(error_code ec, std::string const& msg = "error") {
        if (ec) {
            std::clog << msg << ": " << ec.message() << "\n";
            cancel();
        }
        return !ec.failed();;
    }

    void req_loop(error_code ec = {}) {
        if (!checked(ec, "req_loop")) {
            async_read_until(sock, buf, "\n",
                    [this](error_code ec, size_t xfr) { on_request(ec, xfr); });
        }
    }

    void on_request(error_code ec, size_t n) {
        if (checked(ec, "on_request")) {
            request.resize(n);
            buf.sgetn(request.data(), n);

            response = "Case " + std::to_string(request.at(0) - '0') + "\n";
            async_write(sock, buffer(response), 
                    [this](error_code ec, size_t) { req_loop(ec); });
        }
    }

    void hb_wait(error_code ec = {}) {
        if (checked(ec, "hb_wait")) {
            hbtimer.expires_from_now(2s);
            hbtimer.async_wait([this](error_code ec) { hb_send(ec); });
        }
    }

    void hb_send(error_code ec) {
        if (checked(ec, "hb_send")) {
            async_write(sock, buffer(hbmsg), [this](error_code ec, size_t) { hb_wait(ec); });
        }
    }

    tcp::socket sock;
    boost::asio::high_resolution_timer hbtimer { sock.get_executor() };
    const std::string hbmsg = "HEARTBEAT\n";
    boost::asio::streambuf buf;
    std::string request, response;
};

唯一公开的东西是 start() (其实我们不需要) cancel() 目前,但你知道)。)

主程序可以不做太多改动。

tcp::acceptor acceptor(io, tcp::v4());
acceptor.bind({{}, 3333});
acceptor.listen();

tcp::socket sock(io);
acceptor.accept(sock);

Session sess(std::move(sock));
sess.start(); // does both request loop and the heartbeat

io.run();

没有线程了,完美的异步!使用 bashnetcat 来测试。

while sleep 4; do printf "%d request\n" {1..10}; done | netcat localhost 3333

打印:

host 3333
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
HEARTBEAT
HEARTBEAT
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
^C

停止客户机后,服务器退出时,会出现

on_request: End of file
hb_send: Operation canceled

单线程多段式

一个很大的优势是,现在你可以在一个服务器线程上接受多个客户端。事实上,数千个客户端同时运行都没有问题。

int main() {
    boost::asio::thread_pool io(1);
    try {
        tcp::acceptor acceptor(io, tcp::v4());
        acceptor.bind({{}, 3333});
        acceptor.listen();

        std::list<Session> sessions;

        while (true) {
            tcp::socket sock(io);
            acceptor.accept(sock);

            auto& sess = sessions.emplace_back(std::move(sock));
            sess.start(); // does both request loop and the heartbeat

            sessions.remove_if([](Session& s) { return !s.is_active(); });
        }

        io.join();
    } catch (boost::system::system_error& e) {
        std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.code().message() << "\n";
        return e.code().value();
    }
}

请注意,我们是如何巧妙地将执行上下文改为单人线程池的.这意味着我们仍然在单线程上运行所有会话,但那是一个不同的线程,与运行 main(),意味着我们可以继续接受连接。

为了避免不断增加的 sessions 列表,我们使用一个简单实现的 is_active() 属性。

请注意,我们几乎可以通过执行

for (auto& sess: sessions)
    sess.cancel();

这是ALMOST,因为它需要在池线程上发布取消操作。

for (auto& sess: sessions)
    post(io, [&sess] { sess.cancel(); });

这样做是为了避免与IO池中的任何任务发生冲突

因为只有主线才会触及 sessions 没有必要锁定。

活在科利鲁

测试与

for a in 3 2 1; do (sleep $a; echo "$a request" | nc 127.0.0.1 3333)& done; time wait

打印。

Case 1
Case 2
Case 3
HEARTBEAT
HEARTBEAT
...

多线程的胜利?

现在我们可以添加多线程了。这些变化是温和的。

  • 我们想把套接字与一个字符串关联起来(见 "多线程")。为什么在使用 boost::asio 时,我需要每个连接的 strand?)
  • 注意我们已经使用 sock的执行器来运行定时器
  • 我们必须采取额外的预防措施,使所有的公共接口在 Session 线程安全。

    • 发布行动从 start()cancel() 岸上
    • 做出 active 旗帜 atomic_bool
  • 接下来,我们简单地将池中的线程数从原来的 110

注意,在实践中,很少有意义使用比逻辑核更多的线程。另外,在这个简单的例子中,所有的东西都是IO绑定的,所以单线程可能已经起到了很好的作用。这只是为了演示

活在科利鲁

boost::asio::thread_pool io(10);
try {
    tcp::acceptor acceptor(io, tcp::v4());
    acceptor.set_option(tcp::acceptor::reuse_address(true));
    acceptor.bind({{}, 3333});
    acceptor.listen();

    std::list<Session> sessions;

    while (true) {

        tcp::socket sock(make_strand(io)); // NOTE STRAND!
// ...
// ...

    io.join();

和变化 Session:

   void start() {
        active = true;
        post(sock.get_executor(), [this]{
            hb_wait();
            req_loop();
        });
    }

    void cancel() {
        post(sock.get_executor(), [this]{
            hbtimer.cancel();
            sock.cancel(); // or shutdown() e.g.
            active = false;
        });
    }

// ....

    std::atomic_bool active {false};
}


0
投票

而不是这个。

try
    {
        boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
        acceptor.bind(ep);
        acceptor.listen();
        auto sock = acceptor.accept();
        std::thread t([&sock]() {
            hearbeatSender(sock);
        });
        process(sock);
        t.join();

    }

用它。

try{
        boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
        acceptor.bind(ep);
        acceptor.listen();
        boost::asio::ip::tcp::socket sock(io);
        acceptor.accept(sock);

        std::thread t([&sock]() {
            hearbeatSender(sock);
        });
        process(sock);
        t.join();
}

并包括头文件。

#include <thread>
#include <chrono>

(可选)你也可以使用 this_thread::sleep_for 而不是 sleep()std::this_thread::sleep_for(std::chrono::seconds(10));

解决了向线程传递socket的问题。

现在,在客户端和服务器之间进行HEARTBEAT对话。完整的代码可以从这里查看。

客户端代码 HEARTBEAT每5秒传输一次.

向客户机发出响应的服务器代码

© www.soinside.com 2019 - 2024. All rights reserved.