Anthony Williams 的无锁队列(第 7 章)中的 push() 如何分配新节点

问题描述 投票:0回答:1

我目前正在研究 Anthony Williams 的 C++ concurrency in action (2E),我有很多技术问题,但到目前为止,即使它有效,这也是最奇怪的。

#pragma once


template<typename T>
class CLockFreeQueue
{
private:
    struct node; 
    struct counted_node_ptr 
    {
        int external_count;
        node* ptr; //This points to the actual nodes. 
    };

    std::atomic<counted_node_ptr> mHead; 
    std::atomic<counted_node_ptr> mTail; 

    struct node_counter
    {
        unsigned internal_count : 30; 
        unsigned external_counters : 2;
    };

    //This is the node that holds make up the list
    struct node
    {
        std::atomic<T*> data; 
        std::atomic<node_counter> count; 
        std::atomic<counted_node_ptr> next; 

        
        node()
        {
            node_counter new_count;
            new_count.internal_count = 0;
            new_count.external_counters = 2;
            count.store(new_count);

            counted_node_ptr dummy = { 0 };
            next.store(dummy);
            
            data.store(nullptr);
        }

        void release_ref()
        {
            node_counter old_counter =
                count.load(std::memory_order_relaxed);
            node_counter new_counter;
            do
            {
                new_counter = old_counter; 
                --new_counter.internal_count;
            } while (!count.compare_exchange_strong(old_counter, new_counter,
                                                    std::memory_order_acquire, std::memory_order_relaxed));

            //If both values are zero, we can delete
            if (!new_counter.internal_count && 
                !new_counter.external_counters)
            {
                delete this;
            }
        }
    };

    static void increase_external_count(
        std::atomic<counted_node_ptr>& counter,
        counted_node_ptr& old_counter)
    {
        counted_node_ptr new_counter;
        do
        {
            new_counter = old_counter;
            ++new_counter.external_count;
        } while (!counter.compare_exchange_strong(
            old_counter, new_counter,
            std::memory_order_acquire, std::memory_order_relaxed));
        old_counter.external_count = new_counter.external_count;
    }

    static void free_external_counter(counted_node_ptr& old_node_ptr)
    {
        node* const ptr = old_node_ptr.ptr;
        int const count_increase = old_node_ptr.external_count - 2;
        node_counter old_counter =
            ptr->count.load(std::memory_order_relaxed);
        node_counter new_counter;
        do
        {
            new_counter = old_counter;
            --new_counter.external_counters;
            new_counter.internal_count += count_increase;
        } while (!ptr->count.compare_exchange_strong(
            old_counter, new_counter,
            std::memory_order_acquire, std::memory_order_relaxed));
        if (!new_counter.internal_count &&
            !new_counter.external_counters)
        {
            delete ptr;
        }
    }

    void set_new_tail(counted_node_ptr& old_tail,
        counted_node_ptr const& new_tail)
    {
        node* const current_tail_ptr = old_tail.ptr;
        while (!mTail.compare_exchange_weak(old_tail, new_tail) &&
            old_tail.ptr == current_tail_ptr);
        if (old_tail.ptr == current_tail_ptr)
            free_external_counter(old_tail);
        else
            current_tail_ptr->release_ref();
    }
public:
    CLockFreeQueue() {
        counted_node_ptr h;
        h.ptr = new node;
        h.external_count = 1;

        mHead.store(h);
        mTail.store(mHead.load());
    }

    CLockFreeQueue(const CLockFreeQueue&) = delete;
    CLockFreeQueue& operator=(const CLockFreeQueue&) = delete;

    ~CLockFreeQueue() {
        while (pop());
    }

    void push(T new_value)
    {
        std::unique_ptr<T> new_data(new T(new_value));
        counted_node_ptr new_next;
        new_next.ptr = new node;
        new_next.external_count = 1;
        counted_node_ptr old_tail = mTail.load();
        for (;;)
        {
            increase_external_count(mTail, old_tail);
            T* old_data = nullptr;
            if (old_tail.ptr->data.compare_exchange_strong(
                old_data, new_data.get())) //if successfully changed the old_tail.ptr->data value atomically 
            {
                counted_node_ptr old_next = { 0 };
                if (!old_tail.ptr->next.compare_exchange_strong(
                    old_next, new_next)) 
                { 
                    delete new_next.ptr;
                    new_next = old_next; 
                }

              
                set_new_tail(old_tail, new_next);
                new_data.release();
                break;
            }
            else 
            {
                counted_node_ptr old_next = { 0 };
                if (old_tail.ptr->next.compare_exchange_strong(
                    old_next, new_next)) 
                { 

                    old_next = new_next;
                    new_next.ptr = new node; 
                                       
                } 
         
                set_new_tail(old_tail, old_next);
            }
        }
    }

    std::unique_ptr<T> pop()
    {
        counted_node_ptr old_head = mHead.load(std::memory_order_relaxed);
        for (;;)
        {
            increase_external_count(mHead, old_head);
            node* const ptr = old_head.ptr;
            if (ptr == mTail.load().ptr) //If the list is empty
            {
                ptr->release_ref();
                return std::unique_ptr<T>();
            }
            counted_node_ptr next = ptr->next.load();
            if (mHead.compare_exchange_strong(old_head, next))
            {
                T* const res = ptr->data.exchange(nullptr);
                free_external_counter(old_head);
                return std::unique_ptr<T>(res);
            }
            ptr->release_ref();
        }
    }
};

现在我设法让他的最终无锁队列工作,但有些事情让我感到困惑——它如何在 push() 中获得一个新的 counted_node_ptr?

    counted_node_ptr new_next;
    new_next.ptr = new node;
    new_next.external_count = 1;

我的意思是看起来每个节点都封装了一个 counted_node_ptr 但在 push() 中他似乎在函数中使用堆栈分配来创建一个新的 counted_node_ptr (new_next) 然后使用它交换尾巴

          if (!old_tail.ptr->next.compare_exchange_strong(
                old_next, new_next))

push() 退出时,new_next 不会无效(删除)吗? compare_exchange_strong 是否创建了它的新实例?这让我很好奇。

我的代码有效,我测试了(不确定是否有内存泄漏)但是这个让我感到困惑。

我知道这可能是一个平凡的问题,特别是因为它正在工作,但我只是好奇。我的想法是,如果你要向动态队列添加一些东西,你应该在动态堆而不是堆栈上分配每个新节点。

当 push() 函数退出时,new_next (counted_node_ptr) 如何在队列中分配和维护,因为它看起来是堆栈分配的。

c++ concurrency queue lock-free
1个回答
0
投票

根据

counted_node_ptr
的定义:

struct counted_node_ptr 
{
    int external_count;
    node* ptr; //This points to the actual nodes. 
};

这只是一个

int
和一个指针,所以复制它既便宜又容易。重要的是,摧毁一个
counted_node_ptr
not导致
ptr
被删除。

compare_exchange_strong
被调用时,它可能导致
new_next
被复制到
old_tail.ptr->next
。这只需要复制
int
和指针成员。

我的想法是,如果要向动态队列添加内容,则应该在动态堆而不是堆栈上分配每个新节点。

这里正在动态分配节点:

new_next.ptr = new node;

new_next
只是将动态分配与计数相关联。

© www.soinside.com 2019 - 2024. All rights reserved.