我正在尝试使用 Boost ASIO 库创建一个线程来侦听 C++ 中的 UDP 响应,这是我在类文件中对接收线程的定义:
tello::tello(boost::asio::io_service& io_service, boost::asio::ip::udp::socket& socket, const boost::asio::ip::udp::endpoint& remote_endpoint)
: ip(remote_endpoint.address().to_string()),
port(remote_endpoint.port()),
io_service(io_service),
socket(socket),
remote_endpoint(remote_endpoint)
{
}
void tello::ReceiveThread() {
while (isListening_) {
try {
char receivedData[1024];
boost::system::error_code error;
size_t len = socket.receive_from(boost::asio::buffer(receivedData), remote_endpoint, 0, error);
if (error && error != boost::asio::error::message_size) {
throw boost::system::system_error(error);
}
receivedData[len] = '\0';
spdlog::info("Received UDP data: {}", receivedData);
}
catch (const std::exception& e) {
std::cerr << "Error receiving UDP data: " << e.what() << std::endl;
}
}
}
void tello::StartListening() {
std::thread udpThread(&tello::ReceiveThread, this);
udpThread.detach(); // Detach the thread so it runs independently
}
void tello::StopListening() {
isListening_ = false;
}
在我的 main.cpp 中,我有这个片段来启动线程:
boost::asio::io_service io_service;
boost::asio::ip::udp::socket socket(io_service);
boost::asio::ip::udp::endpoint remote_endpoint(boost::asio::ip::address::from_string("192.168.0.1"), 12345); // Replace with the actual IP address and port
socket.open(boost::asio::ip::udp::v4());
tello drone(io_service, socket, remote_endpoint);
drone.StartListening();
代码编译没有错误,但尝试运行应用程序会导致控制台错误:
Error receiving UDP data: An invalid argument was supplied [system:10022 at C:\Users\(username)\boost_1_82_0\boost\asio\detail\win_iocp_socket_service.hpp:418:5 in function 'receive_from']
我对 Boost ASIO 非常陌生,希望获得有关创建 UDP 接收线程的正确方法的一些帮助。我使用的是带有 C++14 的 Windows 64 位。
你有数据竞争。
您启动一个线程,该线程引用
io_service
和socket
,它们是main
中的局部变量。然而,main
立即退出,破坏了那些局部变量。
由于引用过时,已分离的线程仍在运行,导致未定义的行为。
修复它
(a) avoiding the run-away thread
(b) also synchronizing on its termination
还修复了一些其他问题并编写了一些可能不太安全的代码(例如潜在的活泼的
isListening_
)。还有更多隐藏的问题
例如
startListening
可能更恰当地命名为startReceive
catch
块似乎效率低下,因为它只能处理本地抛出的错误#include <boost/asio.hpp>
#include <fmt/ranges.h>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
namespace spdlog {
void info(auto fmt, auto const&... args) { fmt::print(fmt::runtime(fmt), args...); }
} // namespace spdlog
struct tello {
tello(asio::io_context& io_context, udp::socket& socket, udp::endpoint const& remote_endpoint);
void StartListening();
void StopListening();
void ReceiveThread();
private:
std::string ip;
uint16_t port;
asio::io_context& io_context;
udp::socket& socket;
udp::endpoint remote_endpoint;
std::atomic_bool isListening_{true};
std::thread udpThread;
};
tello::tello(asio::io_context& io_context, udp::socket& socket, udp::endpoint const& remote_endpoint)
: ip(remote_endpoint.address().to_string())
, port(remote_endpoint.port())
, io_context(io_context)
, socket(socket)
, remote_endpoint(remote_endpoint) //
{
socket.bind(remote_endpoint);
}
void tello::ReceiveThread() {
for (char receivedData[1024]; isListening_;) {
try {
error_code error;
size_t len = socket.receive_from(asio::buffer(receivedData), remote_endpoint, 0, error);
if (error && error != asio::error::message_size) {
throw boost::system::system_error(error);
}
receivedData[len] = '\0';
spdlog::info("Received UDP data: {}", std::string_view(receivedData, len));
} catch (std::exception const& e) {
std::cerr << "Error receiving UDP data: " << e.what() << std::endl;
}
}
}
void tello::StartListening() { udpThread = std::thread(&tello::ReceiveThread, this); }
void tello::StopListening() {
isListening_ = false;
udpThread.join();
}
int main() {
asio::io_context io_context;
udp::socket socket(io_context);
udp::endpoint remote_endpoint({}, 12345);
socket.open(udp::v4());
tello drone(io_context, socket, remote_endpoint);
drone.StartListening();
std::this_thread::sleep_for(30s);
drone.StopListening();
}
本地演示:
您想使用异步 IO,这样您就可以获得更好的行为。讽刺的是,我认为它更简单一些(大约少了 25 LoC):
#include <boost/asio.hpp>
#include <fmt/ranges.h>
#include <iostream>
namespace asio = boost::asio;
using asio::ip::udp;
using boost::system::error_code;
using namespace std::chrono_literals;
namespace spdlog {
void info(auto fmt, auto const&... args) { fmt::print(fmt::runtime(fmt), args...); }
} // namespace spdlog
struct tello {
tello(asio::any_io_executor ex, udp::endpoint ep);
void Start() { readLoop(); }
void Stop();
private:
void readLoop() {
socket_.async_receive_from(
asio::buffer(buffer_), remote_endpoint_, [this](error_code ec, size_t len) {
if (ec && ec != asio::error::message_size) {
std::cerr << "Error receiving UDP data: " << ec.message() << std::endl;
} else {
spdlog::info("Received UDP data: {}", std::string_view(buffer_.data(), len));
}
});
}
std::string ip_;
uint16_t port_;
udp::socket socket_;
udp::endpoint remote_endpoint_;
std::array<char, 1024> buffer_;
};
tello::tello(asio::any_io_executor ex, udp::endpoint ep)
: ip_(ep.address().to_string())
, port_(ep.port())
, socket_(ex, ep) {}
void tello::Stop() { socket_.cancel(); }
int main() {
asio::io_context ioc;
tello drone(ioc.get_executor(), {{}, 12345});
drone.Start();
ioc.run_for(30s);
// not really needed:
drone.Stop();
ioc.run();
}
还有更多本地演示: