Boost.asio 使用协程实现多个连接的 echo 服务器

问题描述 投票:0回答:1

我修改了boost.asio官方网页上的一个unix socket echo服务器示例,并采用它来使用协程。

我想允许该服务器同时处理多个连接。 一种解决方案是从后台线程在类会话中运行方法 start,但我更喜欢协程,因此我尝试在单独的生成中运行每个侦听器。这是我的代码(编译版本)

请注意,在方法

server::start
中,当我只尝试一个侦听器时,它起作用了,但对于多个套接字,它不起作用。也许有人可以告诉我我哪里错了?谢谢!

#include <cstdio>
#include <iostream>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>


using namespace std::chrono_literals;

constexpr auto kTimeout = 1s;
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)

using boost::asio::local::stream_protocol;
using boost::asio::io_context;

class session: public boost::enable_shared_from_this<session>
{
public:
    session(boost::asio::io_context& io_context)
    : socket_(io_context),
    timer_(io_context)
    {
  }

  stream_protocol::socket& socket()
  {
    return socket_;
  }

  void start(const boost::asio::yield_context &yield)
  {
      while (1) {
          auto bytes_read = boost::asio::async_read(socket_, boost::asio::buffer(data_, sizeof(data_)),
                                                    boost::asio::transfer_all(), yield);

          //timer wait to simulate data processing
          timer_.expires_after(kTimeout);
          timer_.async_wait(yield);
          
          auto bytes_transferred = boost::asio::async_write(socket_,
                                                            boost::asio::buffer(data_, bytes_read),
                                                            yield);
      }
  }

private:
  // The socket used to communicate with the client.
  stream_protocol::socket socket_;

  // Buffer used to store data received from the client.
  boost::array<char, 256> data_;

  boost::asio::steady_timer timer_;
};

typedef boost::shared_ptr<session> session_ptr;

class server
{
public:
  server(boost::asio::io_context& ioc, const std::string& file)
    : io_context_(ioc),
      strand_(io_context_.get_executor()),
      acceptor_(strand_, stream_protocol::endpoint(file))
  {

      for (int i=0 ; i < 3; i++) {
          boost::asio::spawn(strand_,
                             [this](const boost::asio::yield_context &yield) {
              session_ptr new_session(new session(this->io_context_));
              
              try {
                  acceptor_.async_accept(new_session->socket(), yield);
                  new_session->start(yield);
              } catch (std::exception &e) {
                  std::cout << "new connection needed: " << e.what() << std::endl;
              }
          });
        }
   }

private:
  boost::asio::io_service& io_context_;
  boost::asio::strand<io_context::executor_type> strand_;
  stream_protocol::acceptor acceptor_;
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: stream_server <file>\n";
      std::cerr << "*** WARNING: existing file is removed ***\n";
      return 1;
    }

    boost::asio::io_context ioc;

    std::remove(argv[1]);
    server s(ioc, argv[1]);

    ioc.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << "\n";
  }

  return 0;
}

#else // defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
# error Local sockets not available on this platform.
#endif // defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
c++ sockets boost boost-asio
1个回答
0
投票

您只想运行 1 个接受,并仅在之后生成会话。

更好的是,您可以使用

async_accept
的重载来移动套接字,并指定会话的链:您不需要所有会话共享保护服务器/接受器的同一链。

不用计数循环,只需链式 async_accepts:

struct Server {
    Server(asio::io_context& ioc, std::string const& file)
        : io_context_(ioc)
        , acceptor_(make_strand(ioc), Protocol::endpoint(file)) //
    {
        accept_loop();
    }

  private:
    asio::io_service&      io_context_;
  //asio::strand<Executor> strand_;
    Protocol::acceptor     acceptor_;

    void accept_loop() {
        acceptor_.async_accept(make_strand(io_context_), [this](error_code ec, Protocol::socket s) {
            if (ec != asio::error::operation_aborted)
                accept_loop();

            std::cout << "Accepted: " << ec.message();
            if (!ec)
                std::cout << " from " << s.remote_endpoint();
            std::cout << std::endl;
        });
    }
};

您会注意到我更改了其他一些内容以简化:Live On Coliru

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>
#include <iostream>

using namespace std::chrono_literals;
using namespace std::placeholders;
namespace asio = boost::asio;

constexpr auto kTimeout = 1s;
using Protocol   = asio::local::stream_protocol;
using error_code = boost::system::error_code;

struct Session : public std::enable_shared_from_this<Session> {
    Session(Protocol::socket s) : socket_(std::move(s)) {}
    ~Session() { std::cout << __FUNCTION__ << std::endl; }

    void start(asio::yield_context yield) try {
        for (;;) {
            auto n = async_read(socket_, asio::buffer(data_), yield);

            { // simulate work
                asio::steady_timer t(yield.get_executor(), kTimeout);
                t.async_wait(yield);
            }

            n = async_write(socket_, asio::buffer(data_, n), yield);
        }
    } catch (boost::system::system_error const& se) {
        std::cerr << "Session failed: " << se.code().message() << std::endl;
    }

  private:
    Protocol::socket      socket_;
    std::array<char, 256> data_;
};

struct Server {
    Server(asio::io_context& ioc, std::string const& file)
        : io_context_(ioc)
        , acceptor_(make_strand(ioc), Protocol::endpoint(file)) //
    {
        accept_loop();
    }

  private:
    asio::io_service&      io_context_;
    Protocol::acceptor     acceptor_;

    void accept_loop() {
        acceptor_.async_accept(make_strand(io_context_), [this](error_code ec, Protocol::socket s) {
            if (ec != asio::error::operation_aborted)
                accept_loop(); // continue accepting new connections

            std::cout << "Accepted: " << ec.message() << std::endl;

            auto strand = s.get_executor(); // copy before move
            spawn(strand, bind(&Session::start, std::make_shared<Session>(std::move(s)), _1));
        });
    }
};

int main(int argc, char* argv[]) try {
    if (argc != 2) {
        std::cerr << "Usage: stream_server <file>\n"
            "*** WARNING: existing file is removed ***\n";
        return 1;
    }

    asio::io_context ioc;

    std::remove(argv[1]);
    Server s(ioc, argv[1]);

    ioc.run();
} catch (std::exception const& e) {
    std::cerr << "Exception: " << e.what() << "\n";
}

但是,进一步简化!

整个

shared_ptr
业务现在只是不必要的复杂化。毕竟,您拥有一个完整的 coro 框架,可以让物体保持活力。事实上,对象无非就是套接字和缓冲区。只需制作
Session
一个 coro:Live

void Session(Protocol::socket& socket, asio::yield_context yield) try {
    std::array<char, 256> data;
    for (;;) {
        auto n = async_read(socket, asio::buffer(data), yield);

        { // simulate work
            asio::steady_timer t(yield.get_executor(), kTimeout);
            t.async_wait(yield);
        }

        n = async_write(socket, asio::buffer(data, n), yield);
    }
} catch (boost::system::system_error const& se) {
    std::cerr << "Session failed: " << se.code().message() << std::endl;
}

spawn(strand, bind(Session, std::move(s), _1));

旁注

另外,请记住,

async_read
已经默认为
transfer_all
。如果您需要恰好 256 字节的“消息”,您的方法可能没问题。然而,对我来说,这似乎不像一个“回声服务器”。

在实践中我会

read_some

for (;;) {
    auto n = socket.async_read_some(asio::buffer(data), yield);

    { // simulate work
        asio::steady_timer t(yield.get_executor(), kTimeout);
        t.async_wait(yield);
    }

    n = async_write(socket, asio::buffer(data, n), yield);
}

最后,您想要处理 EOF,因为它可能会部分成功:

for (boost::system::error_code ec; !ec;) {
    auto n = socket.async_read_some(asio::buffer(data), yield[ec]);

    if (n) { // simulate work
        asio::steady_timer t(yield.get_executor(), kTimeout);
        t.async_wait(yield);
    }

    if (!ec || ec == asio::error::eof)
        n = async_write(socket, asio::buffer(data, n), yield);
}
© www.soinside.com 2019 - 2024. All rights reserved.