我目前正在研究 Anthony Williams 的 C++ concurrency in action (2E),我有很多技术问题,但到目前为止,即使它有效,这也是最奇怪的。
#pragma once
template<typename T>
class CLockFreeQueue
{
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr; //This points to the actual nodes.
};
std::atomic<counted_node_ptr> mHead;
std::atomic<counted_node_ptr> mTail;
struct node_counter
{
unsigned internal_count : 30;
unsigned external_counters : 2;
};
//This is the node that holds make up the list
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next;
node()
{
node_counter new_count;
new_count.internal_count = 0;
new_count.external_counters = 2;
count.store(new_count);
counted_node_ptr dummy = { 0 };
next.store(dummy);
data.store(nullptr);
}
void release_ref()
{
node_counter old_counter =
count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.internal_count;
} while (!count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
//If both values are zero, we can delete
if (!new_counter.internal_count &&
!new_counter.external_counters)
{
delete this;
}
}
};
static void increase_external_count(
std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter = old_counter;
++new_counter.external_count;
} while (!counter.compare_exchange_strong(
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
static void free_external_counter(counted_node_ptr& old_node_ptr)
{
node* const ptr = old_node_ptr.ptr;
int const count_increase = old_node_ptr.external_count - 2;
node_counter old_counter =
ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.external_counters;
new_counter.internal_count += count_increase;
} while (!ptr->count.compare_exchange_strong(
old_counter, new_counter,
std::memory_order_acquire, std::memory_order_relaxed));
if (!new_counter.internal_count &&
!new_counter.external_counters)
{
delete ptr;
}
}
void set_new_tail(counted_node_ptr& old_tail,
counted_node_ptr const& new_tail)
{
node* const current_tail_ptr = old_tail.ptr;
while (!mTail.compare_exchange_weak(old_tail, new_tail) &&
old_tail.ptr == current_tail_ptr);
if (old_tail.ptr == current_tail_ptr)
free_external_counter(old_tail);
else
current_tail_ptr->release_ref();
}
public:
CLockFreeQueue() {
counted_node_ptr h;
h.ptr = new node;
h.external_count = 1;
mHead.store(h);
mTail.store(mHead.load());
}
CLockFreeQueue(const CLockFreeQueue&) = delete;
CLockFreeQueue& operator=(const CLockFreeQueue&) = delete;
~CLockFreeQueue() {
while (pop());
}
void push(T new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = mTail.load();
for (;;)
{
increase_external_count(mTail, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(
old_data, new_data.get())) //if successfully changed the old_tail.ptr->data value atomically
{
counted_node_ptr old_next = { 0 };
if (!old_tail.ptr->next.compare_exchange_strong(
old_next, new_next))
{
delete new_next.ptr;
new_next = old_next;
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
}
else
{
counted_node_ptr old_next = { 0 };
if (old_tail.ptr->next.compare_exchange_strong(
old_next, new_next))
{
old_next = new_next;
new_next.ptr = new node;
}
set_new_tail(old_tail, old_next);
}
}
}
std::unique_ptr<T> pop()
{
counted_node_ptr old_head = mHead.load(std::memory_order_relaxed);
for (;;)
{
increase_external_count(mHead, old_head);
node* const ptr = old_head.ptr;
if (ptr == mTail.load().ptr) //If the list is empty
{
ptr->release_ref();
return std::unique_ptr<T>();
}
counted_node_ptr next = ptr->next.load();
if (mHead.compare_exchange_strong(old_head, next))
{
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
return std::unique_ptr<T>(res);
}
ptr->release_ref();
}
}
};
现在我设法让他的最终无锁队列工作,但有些事情让我感到困惑——它如何在 push() 中获得一个新的 counted_node_ptr?
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
我的意思是看起来每个节点都封装了一个 counted_node_ptr 但在 push() 中他似乎在函数中使用堆栈分配来创建一个新的 counted_node_ptr (new_next) 然后使用它交换尾巴
if (!old_tail.ptr->next.compare_exchange_strong(
old_next, new_next))
push() 退出时,new_next 不会无效(删除)吗? compare_exchange_strong 是否创建了它的新实例?这让我很好奇。
我的代码有效,我测试了(不确定是否有内存泄漏)但是这个让我感到困惑。
我知道这可能是一个平凡的问题,特别是因为它正在工作,但我只是好奇。我的想法是,如果你要向动态队列添加一些东西,你应该在动态堆而不是堆栈上分配每个新节点。
当 push() 函数退出时,new_next (counted_node_ptr) 如何在队列中分配和维护,因为它看起来是堆栈分配的。
根据
counted_node_ptr
的定义:
struct counted_node_ptr
{
int external_count;
node* ptr; //This points to the actual nodes.
};
这只是一个
int
和一个指针,所以复制它既便宜又容易。重要的是,摧毁一个counted_node_ptr
将not导致ptr
被删除。
当
compare_exchange_strong
被调用时,它可能导致new_next
被复制到old_tail.ptr->next
。这只需要复制 int
和指针成员。
我的想法是,如果要向动态队列添加内容,则应该在动态堆而不是堆栈上分配每个新节点。
这里正在动态分配节点:
new_next.ptr = new node;
new_next
只是将动态分配与计数相关联。