我正在用boost TCP编写一个客户服务器程序,其中我想每2秒向客户机发送一个HEARTBEAT消息,我正试图创建一个新的线程,以便于发送,但无法解决这个问题。我正在使用以下方法创建线程 boost::thread t(hearbeatSender,sock);
但却出现了很多错误,我也用bind来绑定函数名和socket,但没有解决这个错误。我也用bind将函数名与socket绑定,但没有解决这个错误。
void process(boost::asio::ip::tcp::socket & sock);
std::string read_data(boost::asio::ip::tcp::socket & sock);
void write_data(boost::asio::ip::tcp::socket & sock,std::string);
void hearbeatSender(boost::asio::ip::tcp::socket & sock);
int main()
{
unsigned short port_num = 3333;
boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v4::any(), port_num);
boost::asio::io_service io;
try
{
boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
acceptor.bind(ep);
acceptor.listen();
boost::asio::ip::tcp::socket sock(io);
acceptor.accept(sock);
boost::thread t(hearbeatSender,sock);
process(sock);
t.join();
}
catch (boost::system::system_error &e)
{
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}
void process(boost::asio::ip::tcp::socket & sock)
{
while(1){
std::string data = read_data(sock);
std::cout<<"Client's request is: "<<data<<std::endl;
write_data(sock,data);
}
}
std::string read_data(boost::asio::ip::tcp::socket & sock)
{
boost::asio::streambuf buf;
boost::asio::read_until(sock, buf, "\n");
std::string data = boost::asio::buffer_cast<const char*>(buf.data());
return data;
}
void write_data(boost::asio::ip::tcp::socket & sock,std::string data)
{
boost::system::error_code error;
std::string msg;
int ch = data[0]-'0';
switch(ch)
{
case 1: msg = "Case 1\n"; break;
case 2: msg = "Case 2\n"; break;
case 3: msg = "Case 3\n"; break;
case 4: msg = "Case 4\n"; break;
default: msg = "Case default\n"; break;
}
boost::asio::write( sock, boost::asio::buffer(msg+ "\n"), error );
if( !error ) {
std::cout << "Server sent hello message!" << std::endl;
}
else {
std::cout << "send failed: " << error.message() << std::endl;
}
}
void hearbeatSender(boost::asio::ip::tcp::socket & sock)
{
boost::system::error_code error;
std::string msg = "HEARTBEAT";
while(1)
{
sleep(2);
std::cout<<msg<<std::endl;
boost::asio::write( sock, boost::asio::buffer(msg+ "\n"), error );
if( !error ) {
std::cout << "Server sent HEARTBEAT message!" << std::endl;
}
else {
std::cout << "send failed: " << error.message() << std::endl;
}
}
}
这是一个服务器端的代码,用于响应客户端的消息并向客户端发送心跳。这是一个同步的TCP服务器。
而不是这样。
boost::asio::ip::tcp::socket sock(io);
acceptor.accept(sock);
boost::thread t(hearbeatSender,sock);
用这个来代替:
auto sock = acceptor.accept();
std::thread t([&sock]() {
hearbeatSender(sock);
});
取而代之的是 sleep
,只是使用std::this_thread::sleep进行普遍编译。
下面是编译和运行的完整程序。
#include <boost/asio.hpp>
#include <iostream>
void process(boost::asio::ip::tcp::socket& sock);
std::string read_data(boost::asio::ip::tcp::socket& sock);
void write_data(boost::asio::ip::tcp::socket& sock, std::string);
void hearbeatSender(boost::asio::ip::tcp::socket& sock);
int main()
{
unsigned short port_num = 3333;
boost::asio::ip::tcp::endpoint ep(boost::asio::ip::address_v4::any(), port_num);
boost::asio::io_service io;
try
{
boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
acceptor.bind(ep);
acceptor.listen();
auto sock = acceptor.accept();
std::thread t([&sock]() {
hearbeatSender(sock);
});
process(sock);
t.join();
}
catch (boost::system::system_error& e)
{
std::cout << "Error occured! Error code = " << e.code()
<< ". Message: " << e.what();
return e.code().value();
}
return 0;
}
void process(boost::asio::ip::tcp::socket& sock)
{
while (1) {
std::string data = read_data(sock);
std::cout << "Client's request is: " << data << std::endl;
write_data(sock, data);
}
}
std::string read_data(boost::asio::ip::tcp::socket& sock)
{
boost::asio::streambuf buf;
boost::asio::read_until(sock, buf, "\n");
std::string data = boost::asio::buffer_cast<const char*>(buf.data());
return data;
}
void write_data(boost::asio::ip::tcp::socket& sock, std::string data)
{
boost::system::error_code error;
std::string msg;
int ch = data[0] - '0';
switch (ch)
{
case 1: msg = "Case 1\n"; break;
case 2: msg = "Case 2\n"; break;
case 3: msg = "Case 3\n"; break;
case 4: msg = "Case 4\n"; break;
default: msg = "Case default\n"; break;
}
boost::asio::write(sock, boost::asio::buffer(msg + "\n"), error);
if (!error) {
std::cout << "Server sent hello message!" << std::endl;
}
else {
std::cout << "send failed: " << error.message() << std::endl;
}
}
void hearbeatSender(boost::asio::ip::tcp::socket& sock)
{
boost::system::error_code error;
std::string msg = "HEARTBEAT";
while (1)
{
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << msg << std::endl;
boost::asio::write(sock, boost::asio::buffer(msg + "\n"), error);
if (!error) {
std::cout << "Server sent HEARTBEAT message!" << std::endl;
}
else {
std::cout << "send failed: " << error.message() << std::endl;
}
}
}
在异步IO的情况下使用心跳... "sender "线程与async IO。
更重要的是,socket对象上没有同步,所以这是一个 数据竞赛 也就是 未定义的行为.
最后,这是不安全的。
std::string data = boost::asio::buffer_cast<const char*>(buf.data());
它假设data()是以NUL-结束的(这不是真的)。
你不会为定时器生成线程,而是使用例如 boost::asio::deadline_timer
或 boost::asio::highresolution_timer
. 它可以异步等待,所以你可以在IO服务上做其他任务,直到它到期。
同样,你也可以异步地进行requestresponse的读写。唯一的 "复杂 "因素是异步调用不会在返回之前完成,所以你必须确保缓冲区的寿命足够长(它们不应该是一个本地变量)。
现在,你已经有了一个逻辑上的寿命 "单位",这个单位几乎是从代码中跳出来的。
这简直就是在叫嚣着要重写成:
struct LifeTimeUnit {
boost::asio::ip::tcp::socket sock;
void process();
std::string read_data();
void write_data(std::string);
void hearbeatSender(sock);
};
当然了 LifeTimeUnit
是一个有趣的名字,所以让我们想一个更好的名字。Session
seems meaningful!
现在我们有了一个 lifetime 的单位,它可以帅气地包含其他东西,比如缓冲区和计时器。
struct Session {
Session(tcp::socket&& s) : sock(std::move(s)) {}
void start() {
hb_wait();
req_loop();
}
void cancel() {
hbtimer.cancel();
sock.cancel(); // or shutdown() e.g.
}
private:
bool checked(error_code ec, std::string const& msg = "error") {
if (ec) {
std::clog << msg << ": " << ec.message() << "\n";
cancel();
}
return !ec.failed();;
}
void req_loop(error_code ec = {}) {
if (!checked(ec, "req_loop")) {
async_read_until(sock, buf, "\n",
[this](error_code ec, size_t xfr) { on_request(ec, xfr); });
}
}
void on_request(error_code ec, size_t n) {
if (checked(ec, "on_request")) {
request.resize(n);
buf.sgetn(request.data(), n);
response = "Case " + std::to_string(request.at(0) - '0') + "\n";
async_write(sock, buffer(response),
[this](error_code ec, size_t) { req_loop(ec); });
}
}
void hb_wait(error_code ec = {}) {
if (checked(ec, "hb_wait")) {
hbtimer.expires_from_now(2s);
hbtimer.async_wait([this](error_code ec) { hb_send(ec); });
}
}
void hb_send(error_code ec) {
if (checked(ec, "hb_send")) {
async_write(sock, buffer(hbmsg), [this](error_code ec, size_t) { hb_wait(ec); });
}
}
tcp::socket sock;
boost::asio::high_resolution_timer hbtimer { sock.get_executor() };
const std::string hbmsg = "HEARTBEAT\n";
boost::asio::streambuf buf;
std::string request, response;
};
唯一公开的东西是 start()
(其实我们不需要) cancel()
目前,但你知道)。)
主程序可以不做太多改动。
tcp::acceptor acceptor(io, tcp::v4());
acceptor.bind({{}, 3333});
acceptor.listen();
tcp::socket sock(io);
acceptor.accept(sock);
Session sess(std::move(sock));
sess.start(); // does both request loop and the heartbeat
io.run();
没有线程了,完美的异步!使用 bash
和 netcat
来测试。
while sleep 4; do printf "%d request\n" {1..10}; done | netcat localhost 3333
打印:
host 3333
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
HEARTBEAT
HEARTBEAT
HEARTBEAT
Case 1
Case 2
Case 3
Case 4
Case 5
Case 6
Case 7
Case 8
Case 9
Case 1
^C
停止客户机后,服务器退出时,会出现
on_request: End of file
hb_send: Operation canceled
一个很大的优势是,现在你可以在一个服务器线程上接受多个客户端。事实上,数千个客户端同时运行都没有问题。
int main() {
boost::asio::thread_pool io(1);
try {
tcp::acceptor acceptor(io, tcp::v4());
acceptor.bind({{}, 3333});
acceptor.listen();
std::list<Session> sessions;
while (true) {
tcp::socket sock(io);
acceptor.accept(sock);
auto& sess = sessions.emplace_back(std::move(sock));
sess.start(); // does both request loop and the heartbeat
sessions.remove_if([](Session& s) { return !s.is_active(); });
}
io.join();
} catch (boost::system::system_error& e) {
std::cout << "Error occured! Error code = " << e.code() << ". Message: " << e.code().message() << "\n";
return e.code().value();
}
}
请注意,我们是如何巧妙地将执行上下文改为单人线程池的.这意味着我们仍然在单线程上运行所有会话,但那是一个不同的线程,与运行 main()
,意味着我们可以继续接受连接。
为了避免不断增加的 sessions
列表,我们使用一个简单实现的 is_active()
属性。
请注意,我们几乎可以通过执行
for (auto& sess: sessions) sess.cancel();
这是ALMOST,因为它需要在池线程上发布取消操作。
for (auto& sess: sessions) post(io, [&sess] { sess.cancel(); });
这样做是为了避免与IO池中的任何任务发生冲突
因为只有主线才会触及 sessions
没有必要锁定。
测试与
for a in 3 2 1; do (sleep $a; echo "$a request" | nc 127.0.0.1 3333)& done; time wait
打印。
Case 1
Case 2
Case 3
HEARTBEAT
HEARTBEAT
...
现在我们可以添加多线程了。这些变化是温和的。
sock
的执行器来运行定时器我们必须采取额外的预防措施,使所有的公共接口在 Session
线程安全。
start()
和 cancel()
岸上active
旗帜 atomic_bool
接下来,我们简单地将池中的线程数从原来的 1
曰 10
注意,在实践中,很少有意义使用比逻辑核更多的线程。另外,在这个简单的例子中,所有的东西都是IO绑定的,所以单线程可能已经起到了很好的作用。这只是为了演示
boost::asio::thread_pool io(10);
try {
tcp::acceptor acceptor(io, tcp::v4());
acceptor.set_option(tcp::acceptor::reuse_address(true));
acceptor.bind({{}, 3333});
acceptor.listen();
std::list<Session> sessions;
while (true) {
tcp::socket sock(make_strand(io)); // NOTE STRAND!
// ...
// ...
io.join();
和变化 Session
:
void start() {
active = true;
post(sock.get_executor(), [this]{
hb_wait();
req_loop();
});
}
void cancel() {
post(sock.get_executor(), [this]{
hbtimer.cancel();
sock.cancel(); // or shutdown() e.g.
active = false;
});
}
// ....
std::atomic_bool active {false};
}
而不是这个。
try
{
boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
acceptor.bind(ep);
acceptor.listen();
auto sock = acceptor.accept();
std::thread t([&sock]() {
hearbeatSender(sock);
});
process(sock);
t.join();
}
用它。
try{
boost::asio::ip::tcp::acceptor acceptor(io, ep.protocol());
acceptor.bind(ep);
acceptor.listen();
boost::asio::ip::tcp::socket sock(io);
acceptor.accept(sock);
std::thread t([&sock]() {
hearbeatSender(sock);
});
process(sock);
t.join();
}
并包括头文件。
#include <thread>
#include <chrono>
(可选)你也可以使用 this_thread::sleep_for
而不是 sleep()
std::this_thread::sleep_for(std::chrono::seconds(10));
解决了向线程传递socket的问题。
现在,在客户端和服务器之间进行HEARTBEAT对话。完整的代码可以从这里查看。