我有一个简单的类,其目的是始终通过 UDP 读取数据,并每 X 秒发送一个请求数据包。我尝试通过
boost::asio::deadline_timer
实现计划的请求发送,但是我的代码开始表现得很奇怪:它正确读取传入的数据包,直到计时器第一次触发,然后它停止读取并且套接字只发送数据。我确认问题不在进行连接的设备一侧。
这是我的代码摘要:
电源开关.h:
#pragma once
/* includes */
namespace a
{
using std::thread;
using std::unique_ptr;
using std::array;
class PowerSwitch
{
struct [[gnu::packed]] RequestPacket { /* ... */ };
struct [[gnu::packed]] ResponsePacket { /* ... */ };
struct ChannelData { /* ... */ };
public:
explicit PowerSwitch(string_view ipv4, u16 port, boost::asio::io_context& context, std::chrono::seconds request_interval);
~PowerSwitch();
auto toggle_channel(int channel) -> void;
auto stop() -> void;
private:
auto configure(string_view ipv4, u16 port, std::chrono::seconds request_interval) -> expected<void, string>;
auto request() -> void;
auto read() -> void;
auto handle_incoming(usize bytes) -> void;
auto handle_timer() -> void;
auto write(string_view data) -> void;
private:
boost::asio::ip::udp::socket m_socket;
boost::asio::ip::udp::endpoint m_endpoint;
boost::asio::ip::udp::endpoint m_target;
boost::asio::deadline_timer m_timer;
std::chrono::seconds m_request_interval;
array<u8, 1024> m_buffer;
array<ChannelData, 8> m_channels;
};
}
电源开关.cpp:
/* includes */
using std::span;
using std::vector;
namespace asio = boost::asio;
namespace a
{
constexpr u16 LOCAL_PORT = 12000;
constexpr u16 DUMMY_CHANNEL = 9'999;
constexpr auto REQUEST_MARKER = 0xAAAAAAAA;
PowerSwitch::PowerSwitch(
const string_view ipv4,
const u16 port,
asio::io_context& context,
const std::chrono::seconds request_interval
)
: m_socket(context, asio::ip::udp::endpoint()),
m_endpoint(this->m_socket.local_endpoint()),
m_timer(context, boost::posix_time::seconds(request_interval.count())),
m_request_interval(request_interval),
m_buffer(array<u8, 1024>()),
m_channels(array<ChannelData, 8>())
{
this->configure(ipv4, port, request_interval)
.map_error([](const auto& e){ llog::error("failed to initialize powerswitch: {}", e); });
}
PowerSwitch::~PowerSwitch() { this->stop(); }
auto PowerSwitch::toggle_channel(const int channel) -> void
{
const auto packet = RequestPacket {
.marker = REQUEST_MARKER,
.channel = static_cast<u16>(channel),
.response_port = this->m_endpoint.port(),
.checksum = 0x0000
};
this->write({ reinterpret_cast<const char*>(&packet), sizeof(packet) });
}
auto PowerSwitch::stop() -> void
{
this->m_socket.close();
llog::trace("closing connection to {}", this->m_endpoint.address().to_string());
}
auto PowerSwitch::configure(
const string_view ipv4,
const u16 port,
const std::chrono::seconds request_interval
) -> expected<void, string>
{
this->stop();
try {
this->m_endpoint = asio::ip::udp::endpoint(
asio::ip::make_address_v4(ip::Ipv4::local_address_unchecked().address),
LOCAL_PORT
);
llog::trace("opening socket at {}:{}", this->m_endpoint.address().to_string(), this->m_endpoint.port());
this->m_socket.open(this->m_endpoint.protocol());
this->m_socket.bind(this->m_endpoint);
this->m_request_interval = request_interval;
this->m_target = asio::ip::udp::endpoint(
asio::ip::make_address_v4(ipv4),
port
);
this->m_timer = asio::deadline_timer(
this->m_socket.get_executor(),
boost::posix_time::seconds(request_interval.count())
);
this->handle_timer();
llog::debug("powerswitch service started at {}:{} (receiving from {}:{})",
this->m_socket.local_endpoint().address().to_string(),
this->m_socket.local_endpoint().port(),
this->m_target.address().to_string(),
this->m_target.port()
);
} catch(const std::exception& e) {
return Err("exception: {}", e.what());
}
this->read();
return {};
}
auto PowerSwitch::request() -> void {
llog::trace("powerswitch: sending planned request");
this->toggle_channel(DUMMY_CHANNEL);
}
auto PowerSwitch::read() -> void
{
this->m_socket.async_receive_from(
asio::buffer(this->m_buffer),
this->m_endpoint,
[this](const auto& ec, const auto& bytes_transferred)
{
if(ec) {
llog::error("powerswitch: error: {}", ec.what());
return;
}
this->handle_incoming(bytes_transferred);
this->read();
}
);
}
auto PowerSwitch::handle_incoming(const usize bytes) -> void
{
const auto raw = span(this->m_buffer).first(bytes);
for(const auto datagram = reinterpret_cast<array<ResponsePacket, 8>*>(raw.data());
const auto& [marker, channel, enabled, voltage, current] : *datagram) {
this->m_channels[channel] = ChannelData {
.voltage = static_cast<f32>(voltage) / 1'000.0f,
.current = static_cast<f32>(current),
.enabled = static_cast<bool>(enabled)
};
}
llog::trace("[:{} {}V {} mA]", this->m_channels.front().enabled, this->m_channels.front().voltage, this->m_channels.front().current);
}
auto PowerSwitch::handle_timer() -> void
{
this->request();
this->m_timer.expires_from_now(boost::posix_time::seconds(this->m_request_interval.count()));
this->m_timer.async_wait(
[this](const auto& ec)
{
if(ec) {
llog::error("powerswitch: error: {}", ec.what());
return;
}
this->handle_timer();
}
);
}
auto PowerSwitch::write(const string_view data) -> void
{
this->m_socket.async_send_to(
asio::buffer(data),
this->m_target,
[this](const auto& ec, const auto& bytes_transferred)
{
if(ec) {
llog::error("powerswitch: error: {}", ec.what());
return;
}
llog::trace("powerswitch: sent {} bytes to {}:{}",
bytes_transferred,
this->m_target.address().to_string(),
this->m_target.port()
);
}
);
this->read();
}
}
main.cpp:
auto context = boost::asio::io_context();
auto powerswitch = a::PowerSwitch(
"192.168.1.50",
44000,
context,
std::chrono::seconds(5)
);
context.run();
return 0;
我通过上面的代码得到了以下输出:
[ trace ] [thread 7144 ]: opening socket at 192.168.1.10:12000
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ debug ] [thread 7144 ]: powerswitch service started at 192.168.1.10:12000 (receiving from 192.168.1.50:44000)
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: [:false 11.961V 477 mA]
[ trace ] [thread 7144 ]: [:false 11.934V 493 mA]
[ trace ] [thread 7144 ]: [:false 11.974V 728 mA]
[ trace ] [thread 7144 ]: [:false 12.006V 543 mA]
[ trace ] [thread 7144 ]: [:false 12.004V 543 mA]
[ trace ] [thread 7144 ]: [:false 11.953V 692 mA]
[ trace ] [thread 7144 ]: [:false 11.959V 491 mA]
[ trace ] [thread 7144 ]: [:false 12.063V 583 mA]
[ trace ] [thread 7144 ]: [:false 11.833V 615 mA]
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: [:false 12.075V 613 mA]
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
[ trace ] [thread 7144 ]: powerswitch: sending planned request
[ trace ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000
如果有任何帮助或建议,我将不胜感激。
问题很多。大部分都是因为过于复杂。
引起我注意的第一个大问题是你在
write
方法中 packet
局部变量 toggle_channel
。这会导致未定义的行为,因为异步写入操作将在其生命周期结束后使用它。
老实说,由于 UDP 本质上是即发即忘的,所以我认为没有必要在这里使用异步操作。您只需使用同步
send_to
方法即可完成。
您的
read()
循环和 write()
都链接到更多 read()
操作,这意味着您最终将有许多待处理的读取操作。
析构函数中的
stop()
确实没什么用。只有在 iocontext 完成工作后才会到达它。这意味着没有什么可以阻止。
您可能需要一个 cancel()
方法来取消所有挂起的操作,然后等待它们完成。
当您处理传入消息时,您必须将其视为不受信任的输入。这意味着您应该在使用它之前对其进行验证。最重要的是,您应该验证通道索引是否在范围内。使用
std::array::at
而不是 std::array::operator[]
将为您提供边界检查。
简化
handle_incoming
方法可能看起来像:
void PowerSwitch::handle_incoming(usize bytes) {
auto n_reponses = bytes / sizeof(ResponsePacket);
auto responses = std::span<ResponsePacket const>(
reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);
for (auto& [marker, channel, enabled, voltage, current] : responses) {
m_channels.at(channel) = {.voltage = voltage / 1'000.0f,
.current = static_cast<f32>(current),
.enabled = static_cast<bool>((enabled))};
}
auto& chan = m_channels.front();
llog::trace("[:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
}
请注意,这假设您确保缓冲区正确对齐
ResponsePacket
:
alignas(ResponsePacket) std::array<u8, 1024> m_buffer{};
稍微延长检查范围:
void PowerSwitch::handle_incoming(usize bytes) {
auto n_reponses = bytes / sizeof(ResponsePacket);
auto responses = std::span<ResponsePacket const>(
reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);
for (auto& [marker, channel, enabled, voltage, current] : responses) {
switch (marker) {
case RESPONSE_MARKER: {
if (channel >= m_channels.size()) {
llog::warn("powerswitch: invalid channel: {:#x}", channel);
break;
}
m_channels.at(channel) = {.voltage = voltage / 1'000.0f,
.current = static_cast<f32>(current),
.enabled = static_cast<bool>((enabled))};
break;
}
default: llog::warn("powerswitch: invalid response marker: {:#x}", marker); break;
}
}
auto& chan = m_channels.front();
llog::trace("[:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
}
这是我修复这些问题的尝试。我编写了一些细节以使其独立,并在此过程中简化了许多事情。
void PowerSwitch::handle_incoming(usize bytes) {
auto n_reponses = bytes / sizeof(ResponsePacket);
auto responses = std::span<ResponsePacket const>(
reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);
for (auto& [marker, channel, enabled, voltage, current] : responses) {
switch (marker) {
case RESPONSE_MARKER: {
if (channel >= m_channels.size()) {
llog::warn("powerswitch: invalid channel: {:#x}", channel);
break;
}
m_channels.at(channel) = {.voltage = voltage / 1'000.0f,
.current = static_cast<f32>(current),
.enabled = static_cast<bool>((enabled))};
break;
}
default: llog::warn("powerswitch: invalid response marker: {:#x}", marker); break;
}
}
auto& chan = m_channels.front();
llog::trace("[:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
}
#include <boost/asio.hpp>
#include <fmt/core.h>
#include <fmt/ostream.h>
#include <fmt/ranges.h>
#include <iostream>
#include <span>
#include <string_view>
namespace asio = boost::asio;
using namespace std::chrono_literals;
template <> struct fmt::formatter<asio::ip::udp::endpoint> : ostream_formatter {};
namespace llog {
template <typename... Args> constexpr auto trace(auto const& fmt, Args const&... args) -> void {
std::cout << "TRACE\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
}
template <typename... Args> constexpr auto debug(auto const& fmt, Args const&... args) -> void {
std::cout << "DEBUG\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
}
template <typename... Args> constexpr auto error(auto const& fmt, Args const&... args) -> void {
std::cout << "ERROR\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
}
template <typename... Args> constexpr auto warn(auto const& fmt, Args const&... args) -> void {
std::cout << "WARN\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
}
} // namespace llog
namespace mylib {
using udp = asio::ip::udp;
using boost::system::error_code;
using f32 = float;
using u32 = uint32_t;
using u16 = uint16_t;
using u8 = uint8_t;
using usize = size_t;
using duration = std::chrono::steady_clock::duration;
class PowerSwitch {
struct [[gnu::packed]] RequestPacket {
u32 marker;
u16 channel;
u16 response_port;
u16 checksum;
};
struct [[gnu::packed]] ResponsePacket {
u32 marker;
u16 channel;
u16 enabled;
u16 voltage;
u16 current;
};
struct ChannelData {
f32 voltage;
f32 current;
bool enabled;
};
static_assert(sizeof(mylib::PowerSwitch::RequestPacket) == 10);
static_assert(sizeof(mylib::PowerSwitch::ResponsePacket) == 12);
static_assert(std::is_trivial_v<mylib::PowerSwitch::RequestPacket>);
static_assert(std::is_trivial_v<mylib::PowerSwitch::ResponsePacket>);
static_assert(std::is_standard_layout_v<mylib::PowerSwitch::RequestPacket>);
static_assert(std::is_standard_layout_v<mylib::PowerSwitch::ResponsePacket>);
public:
explicit PowerSwitch(std::string_view ipv4, u16 port, asio::io_context& context,
duration request_interval);
~PowerSwitch();
void toggle_channel(int channel);
void cancel();
private:
void configure(std::string_view ipv4, u16 port, duration request_interval);
void send_request();
void read_loop();
void handle_incoming(usize bytes);
void timed_request_loop();
void write(std::span<std::byte const> data);
private:
udp::socket m_socket;
udp::endpoint m_endpoint = m_socket.local_endpoint();
duration m_request_interval;
udp::endpoint m_target;
asio::steady_timer m_timer{m_socket.get_executor(), m_request_interval};
std::array<ChannelData, 8> m_channels{};
alignas(ResponsePacket) std::array<u8, 1024> m_buffer{};
};
} // namespace mylib
namespace mylib {
constexpr u16 LOCAL_PORT = 12000;
constexpr u16 DUMMY_CHANNEL = 9'999;
constexpr auto REQUEST_MARKER = 0xAAAAAAAA;
constexpr auto RESPONSE_MARKER = 0x55555555;
PowerSwitch::PowerSwitch(std::string_view ipv4, u16 port, asio::io_context& context,
duration request_interval)
: m_socket(context, {{}, LOCAL_PORT})
, m_request_interval(request_interval)
, m_target(asio::ip::make_address_v4(ipv4), port) {
llog::trace("opening socket at {}", m_endpoint);
timed_request_loop();
read_loop();
llog::debug("powerswitch service started at {} (receiving from {})", m_endpoint, m_target);
}
PowerSwitch::~PowerSwitch() { cancel(); }
void PowerSwitch::toggle_channel(int channel) {
RequestPacket packet[]{
{.marker = REQUEST_MARKER,
.channel = static_cast<u16>(channel),
.response_port = m_endpoint.port(),
.checksum = 0x0000},
};
write(std::as_bytes(std::span(packet)));
}
void PowerSwitch::cancel() {
llog::trace("closing connection to {}", m_endpoint);
m_timer.cancel();
m_socket.cancel();
}
void PowerSwitch::send_request() {
llog::trace("powerswitch: sending planned request");
toggle_channel(DUMMY_CHANNEL);
}
void PowerSwitch::read_loop() {
m_socket.async_receive_from( //
asio::buffer(m_buffer), m_endpoint, [this](error_code ec, size_t bytes_transferred) {
if (ec) {
llog::error("powerswitch: error: {}", ec.what());
return;
}
handle_incoming(bytes_transferred);
read_loop();
});
}
void PowerSwitch::handle_incoming(usize bytes) {
auto n_reponses = bytes / sizeof(ResponsePacket);
auto responses = std::span<ResponsePacket const>(
reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);
llog::debug("powerswitch: received {} responses in {} bytes", n_reponses, bytes);
for (auto& [marker, channel, enabled, voltage, current] : responses) {
switch (marker) {
case RESPONSE_MARKER: {
if (channel >= m_channels.size()) {
llog::warn("powerswitch: invalid channel: {:#x}", channel);
break;
}
auto& c = m_channels.at(channel);
c = {.voltage = voltage / 1'000.0f,
.current = static_cast<f32>(current),
.enabled = static_cast<bool>((enabled))};
llog::debug("powerswitch: updated channel {} [:{} {}V {} mA]", channel, //
c.enabled, c.voltage, c.current);
break;
}
default: llog::warn("powerswitch: invalid response marker: {:#x}", marker); break;
}
}
/*
* auto& chan = m_channels.front();
* llog::trace("front [:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
*/
}
void PowerSwitch::timed_request_loop() {
send_request();
m_timer.expires_from_now(m_request_interval);
m_timer.async_wait([this](error_code ec) {
if (ec) {
llog::error("powerswitch: error: {}", ec.what());
return;
}
timed_request_loop();
});
}
void PowerSwitch::write(std::span<std::byte const> data) {
error_code ec;
auto bytes_transferred = m_socket.send_to(asio::buffer(data), m_target, {}, ec);
if (ec)
llog::error("powerswitch: error: {}", ec.what());
llog::trace("powerswitch: sent {} bytes to {} ({})", bytes_transferred, m_target, ec.message());
}
} // namespace mylib
int main() {
auto context = asio::io_context();
// auto powerswitch = mylib::PowerSwitch("192.168.1.50", 44000, context, 5s);
auto powerswitch = mylib::PowerSwitch("127.0.0.1", 44000, context, 5s);
asio::signal_set signals(context, SIGINT, SIGTERM);
signals.async_wait([&](mylib::error_code ec, int signal) {
if (!ec) {
llog::debug("received signal {}", signal);
//context.stop();
powerswitch.cancel();
}
});
context.run();
}
本地演示: