Boost::Asio 截止时间计时器阻止 UDP 套接字读取

问题描述 投票:0回答:1

我有一个简单的类,其目的是始终通过 UDP 读取数据,并每 X 秒发送一个请求数据包。我尝试通过

boost::asio::deadline_timer
实现计划的请求发送,但是我的代码开始表现得很奇怪:它正确读取传入的数据包,直到计时器第一次触发,然后它停止读取并且套接字只发送数据。我确认问题不在进行连接的设备一侧。

这是我的代码摘要:

电源开关.h:

#pragma once

/* includes */

namespace a
{
  using std::thread;
  using std::unique_ptr;
  using std::array;

  class PowerSwitch 
  {
    struct [[gnu::packed]] RequestPacket { /* ... */ };
    struct [[gnu::packed]] ResponsePacket { /* ... */ };
    struct ChannelData { /* ... */ };

    public:
      explicit PowerSwitch(string_view ipv4, u16 port, boost::asio::io_context& context, std::chrono::seconds request_interval);
      ~PowerSwitch();

      auto toggle_channel(int channel) -> void;
      auto stop() -> void;

    private:
      auto configure(string_view ipv4, u16 port, std::chrono::seconds request_interval) -> expected<void, string>;
      auto request() -> void;
      auto read() -> void;
      auto handle_incoming(usize bytes) -> void;
      auto handle_timer() -> void;
      auto write(string_view data) -> void;

    private:
      boost::asio::ip::udp::socket m_socket;
      boost::asio::ip::udp::endpoint m_endpoint;
      boost::asio::ip::udp::endpoint m_target;
      boost::asio::deadline_timer m_timer;
      std::chrono::seconds m_request_interval;
      array<u8, 1024> m_buffer;
      array<ChannelData, 8> m_channels;
  };
}

电源开关.cpp:

/* includes */

using std::span;
using std::vector;
namespace asio = boost::asio;

namespace a
{
  constexpr u16 LOCAL_PORT = 12000;
  constexpr u16 DUMMY_CHANNEL = 9'999;
  constexpr auto REQUEST_MARKER = 0xAAAAAAAA;

  PowerSwitch::PowerSwitch(
    const string_view ipv4,
    const u16 port,
    asio::io_context& context,
    const std::chrono::seconds request_interval
  )
    : m_socket(context, asio::ip::udp::endpoint()),
      m_endpoint(this->m_socket.local_endpoint()),
      m_timer(context, boost::posix_time::seconds(request_interval.count())),
      m_request_interval(request_interval),
      m_buffer(array<u8, 1024>()),
      m_channels(array<ChannelData, 8>())
  {
    this->configure(ipv4, port, request_interval)
      .map_error([](const auto& e){ llog::error("failed to initialize powerswitch: {}", e); });
  }

  PowerSwitch::~PowerSwitch() { this->stop(); }

  auto PowerSwitch::toggle_channel(const int channel) -> void
  {
    const auto packet = RequestPacket {
      .marker = REQUEST_MARKER,
      .channel = static_cast<u16>(channel),
      .response_port = this->m_endpoint.port(),
      .checksum = 0x0000
    };

    this->write({ reinterpret_cast<const char*>(&packet), sizeof(packet) });
  }

  auto PowerSwitch::stop() -> void
  {
    this->m_socket.close();
    llog::trace("closing connection to {}", this->m_endpoint.address().to_string());
  }

  auto PowerSwitch::configure(
    const string_view ipv4,
    const u16 port,
    const std::chrono::seconds request_interval
  ) -> expected<void, string>
  {
    this->stop();
    try {
      this->m_endpoint = asio::ip::udp::endpoint(
        asio::ip::make_address_v4(ip::Ipv4::local_address_unchecked().address),
        LOCAL_PORT
      );

      llog::trace("opening socket at {}:{}", this->m_endpoint.address().to_string(), this->m_endpoint.port());

      this->m_socket.open(this->m_endpoint.protocol());
      this->m_socket.bind(this->m_endpoint);
      this->m_request_interval = request_interval;

      this->m_target = asio::ip::udp::endpoint(
        asio::ip::make_address_v4(ipv4),
        port
      );

      this->m_timer = asio::deadline_timer(
        this->m_socket.get_executor(),
        boost::posix_time::seconds(request_interval.count())
      );
      this->handle_timer();

      llog::debug("powerswitch service started at {}:{} (receiving from {}:{})",
        this->m_socket.local_endpoint().address().to_string(),
        this->m_socket.local_endpoint().port(),
        this->m_target.address().to_string(),
        this->m_target.port()
      );
    } catch(const std::exception& e) {
      return Err("exception: {}", e.what());
    }
    this->read();
    return {};
  }

  auto PowerSwitch::request() -> void {
    llog::trace("powerswitch: sending planned request");
    this->toggle_channel(DUMMY_CHANNEL);
  }

  auto PowerSwitch::read() -> void
  {
    this->m_socket.async_receive_from(
      asio::buffer(this->m_buffer),
      this->m_endpoint,
      [this](const auto& ec, const auto& bytes_transferred)
      {
        if(ec) {
          llog::error("powerswitch: error: {}", ec.what());
          return;
        }
        this->handle_incoming(bytes_transferred);
        this->read();
      }
    );
  }

  auto PowerSwitch::handle_incoming(const usize bytes) -> void
  {
    const auto raw = span(this->m_buffer).first(bytes);
    for(const auto datagram = reinterpret_cast<array<ResponsePacket, 8>*>(raw.data());
      const auto& [marker, channel, enabled, voltage, current] : *datagram) {
      this->m_channels[channel] = ChannelData {
        .voltage = static_cast<f32>(voltage) / 1'000.0f,
        .current = static_cast<f32>(current),
        .enabled = static_cast<bool>(enabled)
      };
    }

    llog::trace("[:{} {}V {} mA]", this->m_channels.front().enabled, this->m_channels.front().voltage, this->m_channels.front().current);
  }

  auto PowerSwitch::handle_timer() -> void
  {
    this->request();
    this->m_timer.expires_from_now(boost::posix_time::seconds(this->m_request_interval.count()));
    this->m_timer.async_wait(
      [this](const auto& ec)
      {
        if(ec) {
          llog::error("powerswitch: error: {}", ec.what());
          return;
        }
        this->handle_timer();
      }
    );
  }

  auto PowerSwitch::write(const string_view data) -> void
  {
    this->m_socket.async_send_to(
      asio::buffer(data),
      this->m_target,
      [this](const auto& ec, const auto& bytes_transferred)
      {
        if(ec) {
          llog::error("powerswitch: error: {}", ec.what());
          return;
        }
        llog::trace("powerswitch: sent {} bytes to {}:{}",
          bytes_transferred,
          this->m_target.address().to_string(),
          this->m_target.port()
        );
      }
    );
    this->read();
  }
}

main.cpp:

auto context = boost::asio::io_context();
  auto powerswitch = a::PowerSwitch(
    "192.168.1.50",
    44000,
    context,
    std::chrono::seconds(5)
  );

  context.run();
  return 0;

我通过上面的代码得到了以下输出:

[ trace  ] [thread 7144 ]: opening socket at 192.168.1.10:12000 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ debug  ] [thread 7144 ]: powerswitch service started at 192.168.1.10:12000 (receiving from 192.168.1.50:44000) 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: [:false 11.961V 477 mA] 
[ trace  ] [thread 7144 ]: [:false 11.934V 493 mA] 
[ trace  ] [thread 7144 ]: [:false 11.974V 728 mA] 
[ trace  ] [thread 7144 ]: [:false 12.006V 543 mA] 
[ trace  ] [thread 7144 ]: [:false 12.004V 543 mA] 
[ trace  ] [thread 7144 ]: [:false 11.953V 692 mA] 
[ trace  ] [thread 7144 ]: [:false 11.959V 491 mA] 
[ trace  ] [thread 7144 ]: [:false 12.063V 583 mA] 
[ trace  ] [thread 7144 ]: [:false 11.833V 615 mA] 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: [:false 12.075V 613 mA] 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 
[ trace  ] [thread 7144 ]: powerswitch: sending planned request 
[ trace  ] [thread 7144 ]: powerswitch: sent 10 bytes to 192.168.1.50:44000 

如果有任何帮助或建议,我将不胜感激。

c++ boost-asio
1个回答
0
投票

问题很多。大部分都是因为过于复杂。

引起我注意的第一个大问题是你在

write
方法中
packet
局部变量
toggle_channel
。这会导致未定义的行为,因为异步写入操作将在其生命周期结束后使用它。

老实说,由于 UDP 本质上是即发即忘的,所以我认为没有必要在这里使用异步操作。您只需使用同步

send_to
方法即可完成。

您的

read()
循环和
write()
都链接到更多
read()
操作,这意味着您最终将有许多待处理的读取操作。

析构函数中的

stop()
确实没什么用。只有在 iocontext 完成工作后才会到达它。这意味着没有什么可以阻止。 您可能需要一个
cancel()
方法来取消所有挂起的操作,然后等待它们完成。

当您处理传入消息时,您必须将其视为不受信任的输入。这意味着您应该在使用它之前对其进行验证。最重要的是,您应该验证通道索引是否在范围内。使用

std::array::at
而不是
std::array::operator[]
将为您提供边界检查。

简化

handle_incoming
方法可能看起来像:

void PowerSwitch::handle_incoming(usize bytes) {
    auto n_reponses = bytes / sizeof(ResponsePacket);
    auto responses  = std::span<ResponsePacket const>(
        reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);

    for (auto& [marker, channel, enabled, voltage, current] : responses) {
        m_channels.at(channel) = {.voltage = voltage / 1'000.0f,
                                  .current = static_cast<f32>(current),
                                  .enabled = static_cast<bool>((enabled))};
    }

    auto& chan = m_channels.front();
    llog::trace("[:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
}

请注意,这假设您确保缓冲区正确对齐

ResponsePacket

alignas(ResponsePacket) std::array<u8, 1024> m_buffer{};

稍微延长检查范围:

void PowerSwitch::handle_incoming(usize bytes) {
    auto n_reponses = bytes / sizeof(ResponsePacket);
    auto responses  = std::span<ResponsePacket const>(
        reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);

    for (auto& [marker, channel, enabled, voltage, current] : responses) {
        switch (marker) {
            case RESPONSE_MARKER: {
                if (channel >= m_channels.size()) {
                    llog::warn("powerswitch: invalid channel: {:#x}", channel);
                    break;
                }
                m_channels.at(channel) = {.voltage = voltage / 1'000.0f,
                                          .current = static_cast<f32>(current),
                                          .enabled = static_cast<bool>((enabled))};
                break;
            }
            default: llog::warn("powerswitch: invalid response marker: {:#x}", marker); break;
        }
    }

    auto& chan = m_channels.front();
    llog::trace("[:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
}

演示

这是我修复这些问题的尝试。我编写了一些细节以使其独立,并在此过程中简化了许多事情。

void PowerSwitch::handle_incoming(usize bytes) {
    auto n_reponses = bytes / sizeof(ResponsePacket);
    auto responses  = std::span<ResponsePacket const>(
        reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);

    for (auto& [marker, channel, enabled, voltage, current] : responses) {
        switch (marker) {
            case RESPONSE_MARKER: {
                if (channel >= m_channels.size()) {
                    llog::warn("powerswitch: invalid channel: {:#x}", channel);
                    break;
                }
                m_channels.at(channel) = {.voltage = voltage / 1'000.0f,
                                          .current = static_cast<f32>(current),
                                          .enabled = static_cast<bool>((enabled))};
                break;
            }
            default: llog::warn("powerswitch: invalid response marker: {:#x}", marker); break;
        }
    }

    auto& chan = m_channels.front();
    llog::trace("[:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
}
#include <boost/asio.hpp>
#include <fmt/core.h>
#include <fmt/ostream.h>
#include <fmt/ranges.h>
#include <iostream>
#include <span>
#include <string_view>
namespace asio = boost::asio;
using namespace std::chrono_literals;

template <> struct fmt::formatter<asio::ip::udp::endpoint> : ostream_formatter {};

namespace llog {
    template <typename... Args> constexpr auto trace(auto const& fmt, Args const&... args) -> void {
        std::cout << "TRACE\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
    }
    template <typename... Args> constexpr auto debug(auto const& fmt, Args const&... args) -> void {
        std::cout << "DEBUG\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
    }
    template <typename... Args> constexpr auto error(auto const& fmt, Args const&... args) -> void {
        std::cout << "ERROR\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
    }
    template <typename... Args> constexpr auto warn(auto const& fmt, Args const&... args) -> void {
        std::cout << "WARN\t" << fmt::format(fmt::runtime(fmt), args...) << std::endl;
    }   
} // namespace llog

namespace mylib {
    using udp = asio::ip::udp;
    using boost::system::error_code;
    using f32   = float;
    using u32   = uint32_t;
    using u16   = uint16_t;
    using u8    = uint8_t;
    using usize = size_t;
    using duration = std::chrono::steady_clock::duration;

    class PowerSwitch {
        struct [[gnu::packed]] RequestPacket {
            u32 marker;
            u16 channel;
            u16 response_port;
            u16 checksum;
        };
        struct [[gnu::packed]] ResponsePacket {
            u32 marker;
            u16 channel;
            u16 enabled;
            u16 voltage;
            u16 current;
        };
        struct ChannelData {
            f32  voltage;
            f32  current;
            bool enabled;
        };

        static_assert(sizeof(mylib::PowerSwitch::RequestPacket) == 10);
        static_assert(sizeof(mylib::PowerSwitch::ResponsePacket) == 12);
        static_assert(std::is_trivial_v<mylib::PowerSwitch::RequestPacket>);
        static_assert(std::is_trivial_v<mylib::PowerSwitch::ResponsePacket>);
        static_assert(std::is_standard_layout_v<mylib::PowerSwitch::RequestPacket>);
        static_assert(std::is_standard_layout_v<mylib::PowerSwitch::ResponsePacket>);

      public:
        explicit PowerSwitch(std::string_view ipv4, u16 port, asio::io_context& context,
                             duration request_interval);
        ~PowerSwitch();

        void toggle_channel(int channel);
        void cancel();

      private:
        void configure(std::string_view ipv4, u16 port, duration request_interval);
        void send_request();
        void read_loop();
        void handle_incoming(usize bytes);
        void timed_request_loop();
        void write(std::span<std::byte const> data);

      private:
        udp::socket        m_socket;
        udp::endpoint      m_endpoint = m_socket.local_endpoint();
        duration           m_request_interval;
        udp::endpoint      m_target;
        asio::steady_timer m_timer{m_socket.get_executor(), m_request_interval};

        std::array<ChannelData, 8> m_channels{};
        alignas(ResponsePacket) std::array<u8, 1024> m_buffer{};
    };
} // namespace mylib

namespace mylib {
    constexpr u16  LOCAL_PORT      = 12000;
    constexpr u16  DUMMY_CHANNEL   = 9'999;
    constexpr auto REQUEST_MARKER  = 0xAAAAAAAA;
    constexpr auto RESPONSE_MARKER = 0x55555555;

    PowerSwitch::PowerSwitch(std::string_view ipv4, u16 port, asio::io_context& context,
                             duration request_interval)
        : m_socket(context, {{}, LOCAL_PORT})
        , m_request_interval(request_interval)
        , m_target(asio::ip::make_address_v4(ipv4), port) {

        llog::trace("opening socket at {}", m_endpoint);

        timed_request_loop();
        read_loop();

        llog::debug("powerswitch service started at {} (receiving from {})", m_endpoint, m_target);
    }

    PowerSwitch::~PowerSwitch() { cancel(); }

    void PowerSwitch::toggle_channel(int channel) {
        RequestPacket packet[]{
            {.marker        = REQUEST_MARKER,
             .channel       = static_cast<u16>(channel),
             .response_port = m_endpoint.port(),
             .checksum      = 0x0000},
        };

        write(std::as_bytes(std::span(packet)));
    }

    void PowerSwitch::cancel() {
        llog::trace("closing connection to {}", m_endpoint);
        m_timer.cancel();
        m_socket.cancel();
    }

    void PowerSwitch::send_request() {
        llog::trace("powerswitch: sending planned request");
        toggle_channel(DUMMY_CHANNEL);
    }

    void PowerSwitch::read_loop() {
        m_socket.async_receive_from( //
            asio::buffer(m_buffer), m_endpoint, [this](error_code ec, size_t bytes_transferred) {
                if (ec) {
                    llog::error("powerswitch: error: {}", ec.what());
                    return;
                }
                handle_incoming(bytes_transferred);
                read_loop();
            });
    }

    void PowerSwitch::handle_incoming(usize bytes) {
        auto n_reponses = bytes / sizeof(ResponsePacket);
        auto responses  = std::span<ResponsePacket const>(
            reinterpret_cast<ResponsePacket const*>(m_buffer.data()), n_reponses);
        llog::debug("powerswitch: received {} responses in {} bytes", n_reponses, bytes);

        for (auto& [marker, channel, enabled, voltage, current] : responses) {
            switch (marker) {
                case RESPONSE_MARKER: {
                    if (channel >= m_channels.size()) {
                        llog::warn("powerswitch: invalid channel: {:#x}", channel);
                        break;
                    }
                    auto& c = m_channels.at(channel);
                    c       = {.voltage = voltage / 1'000.0f,
                               .current = static_cast<f32>(current),
                               .enabled = static_cast<bool>((enabled))};
                    llog::debug("powerswitch: updated channel {} [:{} {}V {} mA]", channel, //
                                c.enabled, c.voltage, c.current);
                    break;
                }
                default: llog::warn("powerswitch: invalid response marker: {:#x}", marker); break;
            }
        }

        /*
         * auto& chan = m_channels.front();
         * llog::trace("front [:{} {}V {} mA]", chan.enabled, chan.voltage, chan.current);
         */
    }

    void PowerSwitch::timed_request_loop() {
        send_request();
        m_timer.expires_from_now(m_request_interval);
        m_timer.async_wait([this](error_code ec) {
            if (ec) {
                llog::error("powerswitch: error: {}", ec.what());
                return;
            }
            timed_request_loop();
        });
    }

    void PowerSwitch::write(std::span<std::byte const> data) {
        error_code ec;
        auto       bytes_transferred = m_socket.send_to(asio::buffer(data), m_target, {}, ec);

        if (ec)
            llog::error("powerswitch: error: {}", ec.what());

        llog::trace("powerswitch: sent {} bytes to {} ({})", bytes_transferred, m_target, ec.message());
    }
} // namespace mylib

int main() {
    auto context     = asio::io_context();
 // auto powerswitch = mylib::PowerSwitch("192.168.1.50", 44000, context, 5s);
    auto powerswitch = mylib::PowerSwitch("127.0.0.1", 44000, context, 5s);

    asio::signal_set signals(context, SIGINT, SIGTERM);
    signals.async_wait([&](mylib::error_code ec, int signal) {
        if (!ec) {
            llog::debug("received signal {}", signal);
            //context.stop();
            powerswitch.cancel();
        }
    });

    context.run();
}

本地演示:

© www.soinside.com 2019 - 2024. All rights reserved.