计算N组交叉点的快速算法

问题描述 投票:1回答:2

我有n套A0,A2,...... An-1持有一套E的物品。

我将配置C定义为由n位组成的整数,因此C的值介于0和2 ^ n-1之间。现在,我定义以下内容:

(C)   an item e of E is in configuration C 
       <=> for each bit b of C, if b==1 then e is in Ab, else e is not in Ab

例如,对于n = 3,配置C = 011对应于在A0和A1中但不在A2中的E项(NOT是重要的)

C[bitmap]是在集合中具有完全存在/不存在模式的元素的数量。 C[001]是A0中任何其他集合中的元素数量。


另一个可能的定义是:

(V)   an item e of E is in configuration V 
       <=> for each bit b of V, if b==1 then e is in Ab

例如,对于n = 3,(V)配置V = 011对应于A0和A1中的E项

V[bitmap]是所选集合的交集计数。 (即位图为真的所有集合中有多少个元素的计数。)V[001]是A0中元素的数量。 V[011]是A0和A1中元素的数量,无论它们是否也在A2中。


在下文中,第一图片显示组A0,A1和A2的项目,第二图片显示(C)配置的大小,第三图片显示(V)配置的大小。

sets examples enter image description here enter image description here

我也可以通过两个向量中的任何一个来表示配置:

C[001]= 5       V[001]=14
C[010]=10       V[010]=22
C[100]=11       V[100]=24
C[011]= 2       V[011]= 6
C[101]= 3       V[101]= 7
C[110]= 6       V[110]=10
C[111]= 4       V[111]= 4

我想要的是编写一个C / C ++函数,尽可能高效地将C转换为V.一个天真的方法可能是以下'transfo'函数,显然在O(4 ^ n):

#include <vector>
#include <cstdio>

using namespace std;

vector<size_t> transfo (const vector<size_t>& C)
{
    vector<size_t> V (C.size());

    for (size_t i=0; i<C.size(); i++)
    {
        V[i] = 0;

        for (size_t j=0; j<C.size(); j++)
        {
            if ((j&i)==i)  { V[i] += C[j]; }
        }
    }

    return V;
} 


int main()
{
    vector<size_t> C = { 
        /* 000 */  0,
        /* 001 */  5,
        /* 010 */ 10,
        /* 011 */  2,
        /* 100 */ 11,
        /* 101 */  3,
        /* 110 */  6,
        /* 111 */  4
    };

    vector<size_t> V = transfo (C);

    for (size_t i=1; i<V.size(); i++)  {  printf ("[%2ld]  C=%2ld   V=%2ld\n", i, C[i], V[i]);  }
}

我的问题是:对于将矢量C转换为矢量V,是否存在比天真更有效的算法?这种“好”算法的复杂性是什么?

请注意,我可能对任何SIMD解决方案感兴趣。

c++ algorithm set intersection simd
2个回答
5
投票

好吧,你正在尝试计算2n个值,所以你不能做得比O(2n)好。

天真的方法从观察开始,即通过固定X中的所有1位并迭代0位所有可能的值来获得V [X]。例如,

V[010] = C[010] + C[011] + C[110] + C[111]

但是这种方法对V的每个元素执行O(2n)加法,产生O(4n)的总复杂度。

这是一个O(n×2n)算法。如果存在O(2n)算法,我也很好奇。

n = 4。让我们考虑V对C的完整表。下表中的每一行对应于一个V值,该值是通过对用*标记的列求和来计算的。 *符号的布局可以很容易地从天真的方法推导出来。

    |0000|0001|0010|0011|0100|0101|0110|0111||1000|1001|1010|1011|1100|1101|1110|1111
0000| *  | *  | *  | *  | *  | *  | *  | *  || *  | *  | *  | *  | *  | *  | *  | *  
0001|    | *  |    | *  |    | *  |    | *  ||    | *  |    | *  |    | *  |    | *  
0010|    |    | *  | *  |    |    | *  | *  ||    |    | *  | *  |    |    | *  | *  
0011|    |    |    | *  |    |    |    | *  ||    |    |    | *  |    |    |    | *  
0100|    |    |    |    | *  | *  | *  | *  ||    |    |    |    | *  | *  | *  | *  
0101|    |    |    |    |    | *  |    | *  ||    |    |    |    |    | *  |    | *  
0110|    |    |    |    |    |    | *  | *  ||    |    |    |    |    |    | *  | *  
0111|    |    |    |    |    |    |    | *  ||    |    |    |    |    |    |    | *  
-------------------------------------------------------------------------------------
1000|    |    |    |    |    |    |    |    || *  | *  | *  | *  | *  | *  | *  | *  
1001|    |    |    |    |    |    |    |    ||    | *  |    | *  |    | *  |    | *  
1010|    |    |    |    |    |    |    |    ||    |    | *  | *  |    |    | *  | *  
1011|    |    |    |    |    |    |    |    ||    |    |    | *  |    |    |    | *  
1100|    |    |    |    |    |    |    |    ||    |    |    |    | *  | *  | *  | *  
1101|    |    |    |    |    |    |    |    ||    |    |    |    |    | *  |    | *  
1110|    |    |    |    |    |    |    |    ||    |    |    |    |    |    | *  | *  
1111|    |    |    |    |    |    |    |    ||    |    |    |    |    |    |    | *  

请注意,左上角,右上角和右下角包含相同的布局。因此,我们可以批量执行一些计算,如下所示:

  1. 计算表格的下半部分(右下角)。
  2. 将值添加到上半部分。
  3. 计算左上角。

如果我们让q = 2n,那么反复出现的复杂性就是

T(q)= 2T(q / 2)+ O(q)

这解决了使用Master Theorem

T(q)= O(q log q)

或者,就n而言,

T(n)= O(n×2n)


3
投票

根据@CătălinFrâncu的伟大观察,我写了两个转换的递归实现(参见下面的代码):

  • transfo_recursive:非常简单的递归实现
  • transfo_avx2:仍然是递归的,但是使用AVX2作为n = 3的递归的最后一步

我在这里建议计数器的大小以32位编码,n值可以增长到28。

我还根据自己对递归行为的观察写了一个迭代实现(transfo_iterative)。实际上,我猜它接近@chtz提出的非递归实现。

这是基准代码:

// compiled with: g++ -O3   intersect.cpp -march=native -mavx2 -lpthread -DNDEBUG

#include <vector>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <cmath>
#include <thread>
#include <algorithm>
#include <sys/times.h>
#include <immintrin.h>
#include <boost/align/aligned_allocator.hpp>

using namespace std;

////////////////////////////////////////////////////////////////////////////////

typedef u_int32_t Count;

// Note: alignment is important for AVX2
typedef std::vector<Count,boost::alignment::aligned_allocator<Count, 8*sizeof(Count)>>  CountVector;

typedef void (*callback) (CountVector::pointer C, size_t q);
typedef vector<pair<const char*, callback>> FunctionsVector;

unsigned int randomSeed = 0;

////////////////////////////////////////////////////////////////////////////////
double timestamp()
{
    struct timespec timet;
    clock_gettime(CLOCK_MONOTONIC, &timet);
    return timet.tv_sec + (timet.tv_nsec/ 1000000000.0);
}

////////////////////////////////////////////////////////////////////////////////
CountVector getRandomVector (size_t n)
{
    // We use the same seed, so we'll get the same random values
    srand (randomSeed);

    // We fill a vector of size q=2^n with random values
    CountVector C(1ULL<<n);
    for (size_t i=0; i<C.size(); i++)  { C[i] = rand() % (1ULL<<(8*sizeof(Count))); }
    return C;
}

////////////////////////////////////////////////////////////////////////////////
void copy_add_block (CountVector::pointer C, size_t q)
{
    for (size_t i=0; i<q/2; i++)   {  C[i] += C[i+q/2]; }
}

////////////////////////////////////////////////////////////////////////////////
void copy_add_block_avx2 (CountVector::pointer C, size_t q)
{
    __m256i* target = (__m256i*) (C);
    __m256i* source = (__m256i*) (C+q/2);

    size_t imax = q/(2*8);

    for (size_t i=0; i<imax; i++)
    {
        target[i] = _mm256_add_epi32 (source[i], target[i]);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Naive approach : O(4^n)
////////////////////////////////////////////////////////////////////////////////
CountVector transfo_naive (const CountVector& C)
{
    CountVector V (C.size());

    for (size_t i=0; i<C.size(); i++)
    {
        V[i] = 0;

        for (size_t j=0; j<C.size(); j++)
        {
            if ((j&i)==i)  { V[i] += C[j]; }
        }
    }

    return V;
}

////////////////////////////////////////////////////////////////////////////////
// Recursive approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recursive (CountVector::pointer C, size_t q)
{
    if (q>1)
    {
        transfo_recursive (C+q/2, q/2);
        transfo_recursive (C,     q/2);

        copy_add_block (C, q);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Iterative approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_iterative (CountVector::pointer C, size_t q)
{
    size_t i = 0;

    for (size_t n=q; n>1; n>>=1, i++)
    {
        size_t d = 1<<i;

        for (ssize_t j=q-1-d; j>=0; j--)
        {
            if ( ((j>>i)&1)==0) { C[j] += C[j+d]; }
        }
    }
}

////////////////////////////////////////////////////////////////////////////////
// Recursive AVX2 approach : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////

#define ROTATE1(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,7,6,5,4,3,2,1))
#define ROTATE2(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,7,6,5,4,3,2))
#define ROTATE4(s)  _mm256_permutevar8x32_epi32 (s, _mm256_set_epi32(0,0,0,0,7,6,5,4))

void transfo_avx2 (CountVector::pointer V, size_t N)
{
    __m256i k1 = _mm256_set_epi32 (0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF,0,0xFFFFFFFF);
    __m256i k2 = _mm256_set_epi32 (0,0,0xFFFFFFFF,0xFFFFFFFF,0,0,0xFFFFFFFF,0xFFFFFFFF);
    __m256i k4 = _mm256_set_epi32 (0,0,0,0,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF,0xFFFFFFFF);

    if (N==8)
    {
        __m256i* source = (__m256i*) (V);

        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE1(*source),k1));
        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE2(*source),k2));
        *source = _mm256_add_epi32 (*source, _mm256_and_si256(ROTATE4(*source),k4));
    }
    else // if (N>8)
    {
        transfo_avx2 (V+N/2, N/2);
        transfo_avx2 (V,     N/2);

        copy_add_block_avx2  (V, N);
    }
}

#define ROTATE1_AND(s)  _mm256_srli_epi64 ((s), 32)  // odd 32bit elements
#define ROTATE2_AND(s)  _mm256_bsrli_epi128 ((s), 8) // high 64bit halves
// gcc doesn't have _mm256_zextsi128_si256
// and _mm256_castsi128_si256 doesn't guarantee zero extension
// vperm2i118 can do the same job as vextracti128, but is slower on Ryzen
#ifdef __clang__                                      // high 128bit lane
#define ROTATE4_AND(s)  _mm256_zextsi128_si256(_mm256_extracti128_si256((s),1))
#else
//#define ROTATE4_AND(s)  _mm256_castsi128_si256(_mm256_extracti128_si256((s),1))
#define ROTATE4_AND(s)  _mm256_permute2x128_si256((s),(s),0x81)  // high bit set = zero that lane
#endif

void transfo_avx2_pcordes (CountVector::pointer C, size_t q)
{
    if (q==8)
    {
        __m256i* source = (__m256i*) (C);
        __m256i tmp = *source;
        tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
        tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
        tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
        *source = tmp;
    }
    else //if (N>8)
    {
        transfo_avx2_pcordes (C+q/2, q/2);
        transfo_avx2_pcordes (C,     q/2);

        copy_add_block_avx2  (C, q);
    }
}

////////////////////////////////////////////////////////////////////////////////
// Template specialization (same as transfo_avx2_pcordes)
////////////////////////////////////////////////////////////////////////////////
template <int n>
void transfo_template (__m256i* C)
{
    const size_t q = 1ULL << n;

    transfo_template<n-1> (C);
    transfo_template<n-1> (C + q/2);

    __m256i* target = (__m256i*) (C);
    __m256i* source = (__m256i*) (C+q/2);

    for (size_t i=0; i<q/2; i++)
    {
        target[i] = _mm256_add_epi32 (source[i], target[i]);
    }
}

template <>
void transfo_template<0> (__m256i* C)
{
    __m256i* source = (__m256i*) (C);
    __m256i tmp = *source;
    tmp = _mm256_add_epi32 (tmp, ROTATE1_AND(tmp));
    tmp = _mm256_add_epi32 (tmp, ROTATE2_AND(tmp));
    tmp = _mm256_add_epi32 (tmp, ROTATE4_AND(tmp));
    *source = tmp;
}

void transfo_recur_template (CountVector::pointer C, size_t q)
{
#define CASE(n)     case 1ULL<<n: transfo_template<n> ((__m256i*)C);   break;

    q = q / 8; // 8 is the number of 32 bits items in the AVX2 registers

    // We have to 'link' the dynamic value of q with a static template specialization
    switch (q)
    {
                  CASE( 1); CASE( 2); CASE( 3); CASE( 4); CASE( 5); CASE( 6); CASE( 7); CASE( 8); CASE( 9);
        CASE(10); CASE(11); CASE(12); CASE(13); CASE(14); CASE(15); CASE(16); CASE(17); CASE(18); CASE(19);
        CASE(20); CASE(21); CASE(22); CASE(23); CASE(24); CASE(25); CASE(26); CASE(27); CASE(28); CASE(29);

        default: printf ("transfo_template undefined for q=%ld\n", q);  break;
    }
}

////////////////////////////////////////////////////////////////////////////////
// Recursive approach multithread : O(n.2^n)
////////////////////////////////////////////////////////////////////////////////
void transfo_recur_thread (CountVector::pointer C, size_t q)
{
    std::thread t1 (transfo_recur_template, C+q/2, q/2);
    std::thread t2 (transfo_recur_template, C,     q/2);
    t1.join();
    t2.join();

    copy_add_block_avx2 (C, q);
}

////////////////////////////////////////////////////////////////////////////////
void header (const char* title, const FunctionsVector& functions)
{
    printf ("\n");
    for (size_t i=0; i<functions.size(); i++)  { printf ("------------------"); }  printf ("\n");
    printf ("%s\n", title);
    for (size_t i=0; i<functions.size(); i++)  { printf ("------------------"); }  printf ("\n");
    printf ("%3s\t", "# n");
    for (auto fct : functions)  {  printf ("%20s\t", fct.first);  }
    printf ("\n");
}

////////////////////////////////////////////////////////////////////////////////
// Check that alternative implementations provide the same result as the naive one
////////////////////////////////////////////////////////////////////////////////
void check (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
    header ("CHECK (0 values means similar to naive approach)", functions);

    for (size_t n=nmin; n<=nmax; n++)
    {
        printf ("%3ld\t", n);

        CountVector reference = transfo_naive (getRandomVector(n));

        for (auto fct : functions)
        {
            // We call the (in place) transformation
            CountVector C = getRandomVector(n);
            (*fct.second) (C.data(), C.size());

            int nbDiffs= 0;
            for (size_t i=0; i<C.size(); i++)
            {
                if (reference[i]!=C[i]) { nbDiffs++; }
            }
            printf ("%20ld\t", nbDiffs);
        }
        printf ("\n");
    }
}

////////////////////////////////////////////////////////////////////////////////
// Performance test
////////////////////////////////////////////////////////////////////////////////
void performance (const FunctionsVector& functions, size_t nmin, size_t nmax)
{
    header ("PERFORMANCE", functions);

    for (size_t n=nmin; n<=nmax; n++)
    {
        printf ("%3ld\t", n);
        for (auto fct : functions)
        {
            // We compute the average time for several executions
            // We use more executions for small n values in order
            // to have more accurate results
            size_t nbRuns = 1ULL<<(2+nmax-n);
            vector<double> timeValues;

            // We run the test several times
            for (size_t r=0; r<nbRuns; r++)
            {
                // We don't want to measure time for vector fill
                CountVector C = getRandomVector(n);

                double t0 = timestamp();
                (*fct.second) (C.data(), C.size());
                double t1 = timestamp();

                timeValues.push_back (t1-t0);
            }

            // We sort the vector of times in order to get the median value
            std::sort (timeValues.begin(), timeValues.end());

            double median = timeValues[timeValues.size()/2];
            printf ("%20lf\t", log(1000.0*1000.0*median)/log(2));
        }
        printf ("\n");
    }
}

////////////////////////////////////////////////////////////////////////////////
//
////////////////////////////////////////////////////////////////////////////////
int main (int argc, char* argv[])
{
    size_t nmin = argc>=2 ? atoi(argv[1]) : 14;
    size_t nmax = argc>=3 ? atoi(argv[2]) : 28;

    // We get a common random seed
    randomSeed = time(NULL);

    FunctionsVector functions = {
        make_pair ("transfo_recursive",        transfo_recursive),
        make_pair ("transfo_iterative",        transfo_iterative),
        make_pair ("transfo_avx2",             transfo_avx2),
        make_pair ("transfo_avx2_pcordes",     transfo_avx2_pcordes),
        make_pair ("transfo_recur_template",   transfo_recur_template),
        make_pair ("transfo_recur_thread",     transfo_recur_thread)
    };

    // We check for some n that alternative implementations
    // provide the same result as the naive approach
    check (functions, 5, 15);

    // We run the performance test
    performance (functions, nmin, nmax);
}

这是性能图:enter image description here

可以观察到,即使与AVX2版本相比,简单的递归实现也相当不错。迭代实现有点令人失望,但我没有做出很大的努力来优化它。

最后,对于我自己的32位计数器用例和n值高达28的情况,与O(4 ^ n)中的初始“天真”方法相比,这些实现对我来说显然是可行的。


更新

在@PeterCordes和@chtz的一些评论之后,我添加了以下实现:

  • transfo-avx2-pcordes:与transfo-avx2相同,具有一些AVX2优化功能
  • transfo-recur-template:与transfo-avx2-pcordes相同,但使用C ++模板专门化来实现递归
  • transfo-recur-thread:对transfo-recur-template的两个初始递归调用使用多线程

以下是更新的基准测试结果:

enter image description here

关于这个结果的一些评论:

  1. AVX2实现在逻辑上是最好的选择,但可能没有最大潜在的x8加速,计数器为32位
  2. 在AVX2实现中,模板专业化带来了一点点加速,但是对于更大的n值,它几乎会消失
  3. 简单的双线程版本对于n <20;对于n> = 20,总有一点加速但远离潜在的2倍。
© www.soinside.com 2019 - 2024. All rights reserved.