我正在尝试实现一种“信号和槽”机制(类似于 Qt),该机制是使用侵入式链表实现的(与使用 std::list 相比,减少所需的动态内存分配数量)。
基本架构由三个类组成:信号类、槽类、连接类。如果槽类或信号类被销毁,则需要通过 a) 从信号/槽列表中删除连接以及 b) 删除连接对象来中断关联的“连接”。
我能够整理一些按预期工作的简化示例代码。现在,我正在尝试找出如何使其线程安全。
#include <memory>
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>
class A;
class B;
class Node
{
public:
using Fn = void(B::*)();
public:
Node(A* a, B* b, Fn fn) : a(a), b(b), fn(fn) {}
~Node();
void foo()
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
(b->*fn)();
}
A* a;
B* b;
Fn fn = nullptr;
std::shared_ptr<Node> nextA = nullptr;
std::shared_ptr<Node> nextB = nullptr;
};
class A
{
public:
A();
~A();
void add(std::shared_ptr<Node> n)
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
n->nextA = std::move(list_);
list_ = std::move(n);
}
void remove(Node* nToRemove)
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
std::shared_ptr<Node> prev = nullptr;
for (auto n = list_; n != nullptr; n = n->nextA)
{
if (n.get() == nToRemove)
{
if (prev == nullptr)
{
list_ = n->nextA;
}
else
{
prev->nextA = n->nextA;
}
break;
}
prev = n;
}
}
void foo()
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
for (auto n = list_; n != nullptr; n = n->nextA)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
n->foo();
}
}
private:
std::shared_ptr<Node> list_;
};
class B
{
public:
B();
~B();
void add(std::shared_ptr<Node> n)
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
n->nextB = std::move(list_);
list_ = std::move(n);
}
void remove(Node* nToRemove)
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
std::shared_ptr<Node> prev = nullptr;
for (auto n = list_; n != nullptr; n = n->nextB)
{
if (n.get() == nToRemove)
{
if (prev == nullptr)
{
list_ = n->nextB;
}
else
{
prev->nextB = n->nextB;
}
break;
}
prev = n;
}
}
void foo()
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
}
void bar()
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
}
private:
std::shared_ptr<Node> list_;
};
Node::~Node()
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
}
A::A()
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
}
A::~A()
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
//"Notify" B to remove nodes
for (auto n = list_; n != nullptr; n = n->nextA)
{
n->b->remove(n.get());
}
}
B::B()
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
}
B::~B()
{
std::cout << __PRETTY_FUNCTION__ << std::endl;
//"Notify" A to remove nodes
for (auto n = list_; n != nullptr; n = n->nextB)
{
n->a->remove(n.get());
}
}
void connect(A& a, B& b, Node::Fn fn)
{
auto c = std::make_shared<Node>(std::addressof(a), std::addressof(b), fn);
a.add(c);
b.add(c);
}
int main()
{
A a;
std::thread t1([](A& a){
B b;
//connect(a, b, &B::foo);
connect(a, b, &B::bar);
std::this_thread::sleep_for(std::chrono::seconds(1));
}, std::reference_wrapper<A>{a});
std::this_thread::sleep_for(std::chrono::seconds(1));
a.foo();
t1.join();
}
添加额外的 sleep_for 语句以强制乱序执行。目前,代码的输出是:
A::A()
B::B()
void A::add(std::shared_ptr<Node>)
void B::add(std::shared_ptr<Node>)
void A::foo()
B::~B()
void A::remove(Node*)
void Node::foo()
void B::bar()
Node::~Node()
A::~A()
如您所见,
B::bar()
最终会在B
的实例被销毁后被调用(如果函数不那么简单,这会导致崩溃)。
我尝试使用互斥体来保护结构,但我遇到了困难,因为数据结构(链表)嵌入了数据本身。当连接“正在使用”时(即,当
A
被调用时),连接不能被破坏(因此 B
和 Node::foo()
不能被破坏)。但如果 A
或 B
被破坏,则无法使用。
这是我将采取的方向的示例,使用标准库中可用的构建块(因此没有我自己的数据结构,例如 Node 类)
#include <functional>
#include <unordered_map>
#include <mutex>
#include <iostream>
namespace details
{
// internal callback to remove a subscription from the signal
struct unreg_signal_itf
{
virtual ~unreg_signal_itf() = default;
virtual void unregister(std::size_t slot_id) = 0;
};
// a raii object representing an active registration
// deleting this object will unregister the callback
struct registration_t
{
unreg_signal_itf& signal;
std::size_t id;
~registration_t()
{
signal.unregister(id);
}
};
}
template<typename... args_t>
class signal_t :
public details::unreg_signal_itf // internal plumbing for RAII registration object
{
public:
~signal_t()
{
// do not destruct signal_t while
// it is calling callbacks or updating m_slots!
std::scoped_lock lock{m_mtx};
}
// subscribe a callback function
auto subscribe(std::function<void(args_t...)> slot)
{
// we are going to update m_slots
// make that threadsafe
std::scoped_lock lock{m_mtx};
m_slots[++m_slot_id] = slot;
return details::registration_t{*this,m_slot_id};
}
// Call all callbacks with specified set of arguments
void operator()(args_t&&... args)
{
// while calling other classes
// ensure the m_slots variable cannot be changed.
// this is for thread safety
std::scoped_lock lock{m_mtx};
// structured binding to iterate over
// all registered callbacks
for(const auto& [id,slot] : m_slots)
{
slot(std::forward<args_t>(args)...);
}
}
private:
// only to be called by a registration_t object
// can be made private so it cannot be misused
// by other clients (at least not easily)
void unregister(std::size_t id) override
{
std::scoped_lock lock{m_mtx};
auto it = m_slots.find(id);
m_slots.erase(it);
}
std::size_t m_slot_id{0ul};
std::mutex m_mtx;
// use a map to link subscription identifiers to slots
// this will make it easy to unregister callbacks
// without resorting to pointers etc.
std::unordered_map<std::size_t,std::function<void(args_t...)>> m_slots;
};
int main()
{
signal_t<int> signal;
// scope to test duration of subscription
{
// with std::function approach
// you can subscribe a lot of things like lambda expressions
auto subscription = signal.subscribe([](int value){ std::cout << value << "\n"; });
signal(42); // will resutl in callback being called
// subscription will go out of scope here
// so callback will be removed from the signal here.
}
// subscription has expired so
// the callback function will no longer be called
signal(42);
}