如何使这个侵入式列表线程安全?

问题描述 投票:0回答:1

我正在尝试实现一种“信号和槽”机制(类似于 Qt),该机制是使用侵入式链表实现的(与使用 std::list 相比,减少所需的动态内存分配数量)。

基本架构由三个类组成:信号类、槽类、连接类。如果槽类或信号类被销毁,则需要通过 a) 从信号/槽列表中删除连接以及 b) 删除连接对象来中断关联的“连接”。

我能够整理一些按预期工作的简化示例代码。现在,我正在尝试找出如何使其线程安全。

#include <memory>
#include <iostream>
#include <thread>
#include <functional>
#include <mutex>

class A;
class B;

class Node
{
public:
    using Fn = void(B::*)();

public:
    Node(A* a, B* b, Fn fn) : a(a), b(b), fn(fn) {}
    ~Node();

    void foo()
    {
        std::cout << __PRETTY_FUNCTION__ << std::endl;
        (b->*fn)();
    }

    A* a;
    B* b;

    Fn fn = nullptr;

    std::shared_ptr<Node> nextA = nullptr;
    std::shared_ptr<Node> nextB = nullptr;
};

class A
{
public:
    A();
    ~A();

    void add(std::shared_ptr<Node> n)
    {
        std::cout << __PRETTY_FUNCTION__ << std::endl;

        n->nextA = std::move(list_);
        list_ = std::move(n);
    }

    void remove(Node* nToRemove)
    {
        std::cout << __PRETTY_FUNCTION__ << std::endl;

        std::shared_ptr<Node> prev = nullptr;
        for (auto n = list_; n != nullptr; n = n->nextA)
        {
            if (n.get() == nToRemove)
            {
                if (prev == nullptr)
                {
                    list_ = n->nextA;
                }
                else
                {
                    prev->nextA = n->nextA;
                }
                
                break;
            }

            prev = n;
        }
    }

    void foo()
    {
        std::cout << __PRETTY_FUNCTION__ << std::endl;
        for (auto n = list_; n != nullptr; n = n->nextA)
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            n->foo();
        }
    }

private:
    std::shared_ptr<Node> list_;
};

class B
{
public:
    B();
    ~B();

    void add(std::shared_ptr<Node> n)
    {
        std::cout << __PRETTY_FUNCTION__ << std::endl;

        n->nextB = std::move(list_);
        list_ = std::move(n);
    }

    void remove(Node* nToRemove)
    {
        std::cout << __PRETTY_FUNCTION__ << std::endl;

        std::shared_ptr<Node> prev = nullptr;
        for (auto n = list_; n != nullptr; n = n->nextB)
        {
            if (n.get() == nToRemove)
            {
                if (prev == nullptr)
                {
                    list_ = n->nextB;
                }
                else
                {
                    prev->nextB = n->nextB;
                }
                
                break;
            }

            prev = n;
        }
    }

    void foo()
    {
        std::cout << __PRETTY_FUNCTION__ << std::endl;
    }

    void bar()
    {
        std::cout << __PRETTY_FUNCTION__ << std::endl;
    }

private:
    std::shared_ptr<Node> list_;
};

Node::~Node()
{
    std::cout << __PRETTY_FUNCTION__ << std::endl;
}

A::A()
{
    std::cout << __PRETTY_FUNCTION__ << std::endl;
}

A::~A()
{
    std::cout << __PRETTY_FUNCTION__ << std::endl;

    //"Notify" B to remove nodes
    for (auto n = list_; n != nullptr; n = n->nextA)
    {
        n->b->remove(n.get());
    }
}

B::B()
{
    std::cout << __PRETTY_FUNCTION__ << std::endl;
}

B::~B()
{
    std::cout << __PRETTY_FUNCTION__ << std::endl;

    //"Notify" A to remove nodes
    for (auto n = list_; n != nullptr; n = n->nextB)
    {
        n->a->remove(n.get());
    }
}

void connect(A& a, B& b, Node::Fn fn)
{
    auto c = std::make_shared<Node>(std::addressof(a), std::addressof(b), fn);
    a.add(c);
    b.add(c);
}

int main()
{
    A a;

    std::thread t1([](A& a){
        B b;
        //connect(a, b, &B::foo);
        connect(a, b, &B::bar);
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }, std::reference_wrapper<A>{a});

    std::this_thread::sleep_for(std::chrono::seconds(1));

    a.foo();

    t1.join();
}

添加额外的 sleep_for 语句以强制乱序执行。目前,代码的输出是:

A::A()
B::B()
void A::add(std::shared_ptr<Node>)
void B::add(std::shared_ptr<Node>)
void A::foo()
B::~B()
void A::remove(Node*)
void Node::foo()
void B::bar()
Node::~Node()
A::~A()

如您所见,

B::bar()
最终会在
B
的实例被销毁后被调用(如果函数不那么简单,这会导致崩溃)。

我尝试使用互斥体来保护结构,但我遇到了困难,因为数据结构(链表)嵌入了数据本身。当连接“正在使用”时(即,当

A
被调用时),连接不能被破坏(因此
B
Node::foo()
不能被破坏)。但如果
A
B
被破坏,则无法使用。

c++ thread-safety
1个回答
0
投票

这是我将采取的方向的示例,使用标准库中可用的构建块(因此没有我自己的数据结构,例如 Node 类)

#include <functional>
#include <unordered_map>
#include <mutex>
#include <iostream>

namespace details
{

// internal callback to remove a subscription from the signal
struct unreg_signal_itf
{
     virtual ~unreg_signal_itf() = default;
     virtual void unregister(std::size_t slot_id) = 0;
};


// a raii object representing an active registration
// deleting this object will unregister the callback
struct registration_t
{
    unreg_signal_itf& signal;
    std::size_t id;

    ~registration_t()
    {
        signal.unregister(id);
    }
};
}

template<typename... args_t>
class signal_t : 
    public details::unreg_signal_itf // internal plumbing for RAII registration object
{
public:

    ~signal_t()
    {
        // do not destruct signal_t while
        // it is calling callbacks or updating m_slots!
        std::scoped_lock lock{m_mtx};
    }

    // subscribe a callback function
    auto subscribe(std::function<void(args_t...)> slot)
    {
        // we are going to update m_slots
        // make that threadsafe
        std::scoped_lock lock{m_mtx};
        m_slots[++m_slot_id] = slot;
        return details::registration_t{*this,m_slot_id};
    }

    // Call all callbacks with specified set of arguments
    void operator()(args_t&&... args)
    {
        // while calling other classes
        // ensure the m_slots variable cannot be changed.
        // this is for thread safety
        std::scoped_lock lock{m_mtx};

        // structured binding to iterate over 
        // all registered callbacks
        for(const auto& [id,slot] : m_slots)
        {
            slot(std::forward<args_t>(args)...);
        }
    }

private:
    // only to be called by a registration_t object
    // can be made private so it cannot be misused
    // by other clients (at least not easily)
    void unregister(std::size_t id) override
    {
        std::scoped_lock lock{m_mtx};
        auto it = m_slots.find(id);
        m_slots.erase(it);
    }

    std::size_t m_slot_id{0ul};
    std::mutex m_mtx;

    // use a map to link subscription identifiers to slots
    // this will make it easy to unregister callbacks
    // without resorting to pointers etc.
    std::unordered_map<std::size_t,std::function<void(args_t...)>> m_slots;
};


int main()
{
    signal_t<int> signal;

    // scope to test duration of subscription
    {
        // with std::function approach 
        // you can subscribe a lot of things like lambda expressions
        auto subscription = signal.subscribe([](int value){ std::cout << value << "\n"; });

        signal(42); // will resutl in callback being called

        // subscription will go out of scope here
        // so callback will be removed from the signal here.
    }

    // subscription has expired so
    // the callback function will no longer be called
    signal(42);

}
© www.soinside.com 2019 - 2024. All rights reserved.