我试图根据由梅德Vyukov this one written in C实现一个无锁多单生产者消费者队列。
我写了一个测试,到目前为止几乎工程。但消费者往往错过只有一个项目,第一或第二。有时候,消费者会怀念的投入的一半。
因为它是现在,它不锁免费。它锁定在每次使用new
操作者的时间,但我希望得到它的工作和插科打诨与分配器之前写一些更详尽的测试。
// src/MpscQueue.hpp
#pragma once
#include <memory>
#include <atomic>
#include <optional>
/**
* Adapted from http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
* @tparam T
*/
template< typename T >
class MpscQueue {
public:
MpscQueue() {
stub.next.store( nullptr );
head.store( &stub );
tail = &stub;
}
void push( const T& t ) {
emplace( t );
}
void push( T&& t ) {
emplace( std::move( t ));
}
template< typename ... Args >
void emplace( Args...args ) {
auto node = new Node{ std::make_unique<T>( std::forward<Args>( args )... ), nullptr };
push( node );
}
/**
* Returns an item from the queue and returns a unique pointer to it.
*
* If the queue is empty returns a unique pointer set to nullptr
*
* @return A unique ptr to the popped item
*/
std::unique_ptr<T> pop() {
Node* tailCopy = tail;
Node* next = tailCopy->next.load();
auto finalize = [ & ]() {
tail = next;
std::unique_ptr<Node> p( tailCopy ); // free the node memory after we return
return std::move( tail->value );
};
if ( tailCopy == &stub ) {
if ( next == nullptr ) return nullptr;
tail = next;
tailCopy = next;
next = next->next;
}
if ( next ) return std::move( finalize());
if ( tail != head.load()) return nullptr;
push( &stub );
next = tailCopy->next;
return next ? std::move( finalize()) : nullptr;
}
private:
struct Node {
std::unique_ptr<T> value;
std::atomic<Node*> next;
};
void push( Node* node ) {
Node* prev = head.exchange( node );
prev->next = node;
}
Node stub;
std::atomic<Node*> head;
Node* tail;
};
// test/main.cpp
#pragma clang diagnostic push
#pragma ide diagnostic ignored "OCUnusedMacroInspection"
#define BOOST_TEST_MODULE test_module
#pragma clang diagnostic pop
#include <boost/test/unit_test.hpp>
// test/utils.hpp
#pragma once
#include <vector>
template< class T >
void removeFromBothIfIdentical( std::vector<T>& a, std::vector<T>& b ) {
size_t i = 0;
size_t j = 0;
while ( i < a.size() && j < b.size()) {
if ( a[ i ] == b[ j ] ) {
a.erase( a.begin() + i );
b.erase( b.begin() + j );
}
else if ( a[ i ] < b[ j ] ) ++i;
else if ( a[ i ] > b[ j ] ) ++j;
}
}
namespace std {
template< typename T >
std::ostream& operator<<( std::ostream& ostream, const std::vector<T>& container ) {
if ( container.empty())
return ostream << "[]";
ostream << "[";
std::string_view separator;
for ( const auto& item: container ) {
ostream << item << separator;
separator = ", ";
}
return ostream << "]";
}
}
template< class T >
std::vector<T> extractDuplicates( std::vector<T>& container ) {
auto iter = std::unique( container.begin(), container.end());
std::vector<T> duplicates;
std::move( iter, container.end(), back_inserter( duplicates ));
return duplicates;
}
#define CHECK_EMPTY( container, message ) \
BOOST_CHECK_MESSAGE( (container).empty(), (message) << ": " << (container) )
// test/MpscQueue.cpp
#pragma ide diagnostic ignored "cert-err58-cpp"
#include <thread>
#include <numeric>
#include <boost/test/unit_test.hpp>
#include "../src/MpscQueue.hpp"
#include "utils.hpp"
using std::thread;
using std::vector;
using std::back_inserter;
BOOST_AUTO_TEST_SUITE( MpscQueueTestSuite )
BOOST_AUTO_TEST_CASE( two_producers ) {
constexpr int until = 1000;
MpscQueue<int> queue;
thread producerEven( [ & ]() {
for ( int i = 0; i < until; i += 2 )
queue.push( i );
} );
thread producerOdd( [ & ]() {
for ( int i = 1; i < until; i += 2 )
queue.push( i );
} );
vector<int> actual;
thread consumer( [ & ]() {
using namespace std::chrono_literals;
std::this_thread::sleep_for( 2ms );
while ( auto n = queue.pop())
actual.push_back( *n );
} );
producerEven.join();
producerOdd.join();
consumer.join();
vector<int> expected( until );
std::iota( expected.begin(), expected.end(), 0 );
std::sort( actual.begin(), actual.end());
vector<int> duplicates = extractDuplicates( actual );
removeFromBothIfIdentical( expected, actual );
CHECK_EMPTY( duplicates, "Duplicate items" );
CHECK_EMPTY( expected, "Missing items" );
CHECK_EMPTY( actual, "Extra items" );
}
BOOST_AUTO_TEST_SUITE_END()
我多生产,低于单次消费例子是写在阿达。我提供这个作为虚拟“伪代码”为您考虑的来源。这个例子有三种文件。
的例子实现具有多个生产者,共享缓冲器,并且记录由生产者产生的串的单个消费者一个简单的数据记录器。
第一文件是共享缓冲器中的包规范。阿达包规范定义在包中定义的实体的API。在这种情况下,实体是一个受保护的缓冲区和程序停止记录器。
-----------------------------------------------------------------------
-- Asynchronous Data Logger
-----------------------------------------------------------------------
with Ada.Strings.Unbounded; use Ada.Strings.Unbounded;
package Async_Logger is
type Queue_Index is mod 256;
type Queue_T is array (Queue_Index) of Unbounded_String;
protected Buffer is
entry Put (Log_Entry : in String);
entry Get (Stamped_Entry : out Unbounded_String);
private
Queue : Queue_T;
P_Index : Queue_Index := 0;
G_Index : Queue_Index := 0;
Count : Natural := 0;
end Buffer;
procedure Stop_Logging;
end Async_Logger;
在受保护的缓冲区中的条目允许的任务(即线程)写入缓冲区,从缓冲区中读取。条目自动执行缓冲的所有必要的锁定控制。
的缓冲器代码和Stop_Logging程序的执行在所述封装主体中实现。 ,做记录的消费者也在工作主体实施,使得消费者看不见的生产线。
with Ada.Calendar; use Ada.Calendar;
with Ada.Calendar.Formatting; use Ada.Calendar.Formatting;
with Ada.Text_IO; use Ada.Text_IO;
package body Async_Logger is
------------
-- Buffer --
------------
protected body Buffer is
---------
-- Put --
---------
entry Put (Log_Entry : in String) when Count < Queue_Index'Modulus is
T_Stamp : Time := Clock;
Value : Unbounded_String :=
To_Unbounded_String
(Image (Date => T_Stamp, Include_Time_Fraction => True) & " : " &
Log_Entry);
begin
Queue (P_Index) := Value;
P_Index := P_Index + 1;
Count := Count + 1;
end Put;
---------
-- Get --
---------
entry Get (Stamped_Entry : out Unbounded_String) when Count > 0 is
begin
Stamped_Entry := Queue (G_Index);
G_Index := G_Index + 1;
Count := Count - 1;
end Get;
end Buffer;
task Logger is
entry Stop;
end Logger;
task body Logger is
Phrase : Unbounded_String;
begin
loop
select
accept Stop;
exit;
else
select
Buffer.Get (Phrase);
Put_Line (To_String (Phrase));
or
delay 0.01;
end select;
end select;
end loop;
end Logger;
procedure Stop_Logging is
begin
Logger.Stop;
end Stop_Logging;
end Async_Logger;
认沽条目有监护条件允许时,缓冲区未满入门到只执行。在获取入口有监护条件允许时,缓冲区为空进入,只执行。
命名记录器的任务是消费者任务。它运行,直到其停止进入被调用。
该Stop_Logging过程调用记录仪的止损挂单。
所述第三文件是用来测试Async_Logger包“主”过程。此文件创建两个生产者,P1和P2。每写10个消息缓冲区,然后这些生产者退出。
with Async_Logger; use Async_Logger;
procedure Async_Test is
task P1;
task P2;
task body P1 is
begin
for I in 1..10 loop
Buffer.Put(I'Image);
delay 0.01;
end loop;
end P1;
task body P2 is
Num : Float := 0.0;
begin
for I in 1..10 loop
Buffer.Put(Num'Image);
Num := Num + 1.0;
delay 0.01;
end loop;
end P2;
begin
delay 0.2;
Stop_Logging;
end Async_Test;
该Async_Test过程只需等待0.2秒然后调用Stop_Logging。
这个程序的运行的输出是:
2019-02-11 18:35:01.83 : 1
2019-02-11 18:35:01.83 : 0.00000E+00
2019-02-11 18:35:01.85 : 1.00000E+00
2019-02-11 18:35:01.85 : 2
2019-02-11 18:35:01.87 : 3
2019-02-11 18:35:01.87 : 2.00000E+00
2019-02-11 18:35:01.88 : 3.00000E+00
2019-02-11 18:35:01.88 : 4
2019-02-11 18:35:01.90 : 5
2019-02-11 18:35:01.90 : 4.00000E+00
2019-02-11 18:35:01.92 : 6
2019-02-11 18:35:01.92 : 5.00000E+00
2019-02-11 18:35:01.93 : 6.00000E+00
2019-02-11 18:35:01.93 : 7
2019-02-11 18:35:01.95 : 7.00000E+00
2019-02-11 18:35:01.95 : 8
2019-02-11 18:35:01.96 : 8.00000E+00
2019-02-11 18:35:01.96 : 9
2019-02-11 18:35:01.98 : 10
2019-02-11 18:35:01.98 : 9.00000E+00