C++ 如何加速(使用 x86 SIMD)批量可变长度整数编码/解码(可运行基准测试)

问题描述 投票:0回答:2

我有这种编码方法,其工作原理是将 16x

int64_t
的小块编码为 16 个标志半字节的小块,打包成 8 个字节,然后为每个输入 1 个或更多字节的有效负载
int64_t
:

  • 为标志保留1个字节(1个字节可以存储2个值的标志)
  • sign(value)
    存储在标志中
  • abs(value)
    存储在缓冲区中,字节数可变
  • 存储标志中所需的字节数。 (3 位字段)

此示例基准测试经过简化。我已经用实际数据测试了我的用例:它比 LEB128(又名 varint)和 lz4 更快,而且压缩得更好。那么让我们重点关注这种编码方法或类似的方法。

如何改进这段代码? 我最关心的是解压速度。如果可以提高速度,可以对编码/解码格式(或其他格式)进行一些更改。我想知道在这种情况下是否可以使用 SIMD。

特别提示:

  • 大多数
    abs(value)
    有长度 <= 2 bytes. In my actual use case, the data is mostly
  • 99%的情况下,连续4个
    abs(value)
    可以用<= 7 bytes. Similarly, 99% of the time, 16 consecutive
    abs(value)
    表示可以用<= 32 byte.
  • 表示
  • 两个重要的功能是
    encode
    decode_all
#include <iostream>
#include <string>
#include <cstring>
#include <algorithm>
#include <vector>
#include <chrono>
using namespace std;

namespace {
    class MyTimer {
    std::chrono::time_point<std::chrono::system_clock> start;

public:
    void startCounter() {
        start = std::chrono::system_clock::now();
    }

    int64_t getCounterNs() {
        return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count();
    }

    int64_t getCounterMs() {
        return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count();
    }

    double getCounterMsPrecise() {
        return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count()
                / 1000000.0;
    }
};
}

template <typename T>
void leb128_encode(uint8_t* data, T value, int &idx)
{
  bool more = true;
  bool negative = value < 0;
  constexpr int size = sizeof(value) * 8;

  while (more) {
    uint8_t byte = value & 0x7F;
    value >>= 7;
    if (negative) value |= (~0LL << (size - 7));

    if ((value == 0 && (byte & 0x40) == 0) || (value == -1 && (byte & 0x40) != 0)) {
      more = false;
    } else {
        byte |= 0x80;
    }

    data[idx++] = byte;
  }
}

template <typename T>
void leb128_decode(uint8_t* data, T &value, int &idx)
{
  value = 0;
  int shift = 0;
  constexpr int size = sizeof(value) * 8;
  uint8_t byte;

  do {
    byte = data[idx++];
    value |= int64_t(byte & 0x7F) << shift;
    shift += 7;
  } while (byte & 0x80);

  if (shift < size && (byte & 0x40)) {
    value |= (~0LL << shift);
  }
}

int n_msg = 100'000'000;
constexpr int BLK_SIZE = 16;
uint8_t* buffer;

int64_t *original;
int64_t *decoded;
int64_t *dummy_array;

// encode 1 value using variable length encoding,
// storing the result in a flag
void encode(int64_t value, uint8_t &flag, size_t &idx)
{
  bool negative = value < 0;
  value = abs(value);

  uint8_t byte_cnt = 0;
  while (value > 0) {
    buffer[idx] = value & 0xFF; // lowest byte
    idx++;
    value >>= 8;
    byte_cnt++;
  }

  // special cases. Since we only have 3 bits to represent 9 values (0->8),
  // we choose to group case 7-8 together because they're rarest.  
  if (byte_cnt == 7) {
    buffer[idx] = 0;
    idx++;
  } else if (byte_cnt == 8) {
    // since we only have 3 bits, we use 7 to represent 8 bytes
    byte_cnt = 7;
  }

  flag = byte_cnt; // bit 0-2 represent length
  if (negative) flag |= 0b1000; // bit 3 represent sign (1 means negative)  
}

void __attribute__ ((noinline)) encode_all()
{
  size_t idx = 0;
  // encode in blocks of 16 values
  for (size_t i = 0; i < n_msg; i += 16) {
    size_t flag_idx = idx;
    idx += 8; // first 8 bytes of a block are for sign/length flags
    for (int t = 0; t < BLK_SIZE; t += 2) {
      uint8_t low_flag, high_flag;
      encode(original[i + t], low_flag, idx);
      encode(original[i + t + 1], high_flag, idx);
      buffer[flag_idx + t / 2] = (high_flag << 4) | low_flag;
    }
  }
  cout << "Compression ratio = " << (double(idx) / (n_msg * 8)) << "\n";
}

template <typename T>
void extract_flag(uint8_t flag, T &low_sign, T &low_length, T &high_sign, T &high_length)
{
  low_sign = flag & 0b1000;
  low_length = flag & 0b0111;
  high_sign = (flag >> 4) & 0b1000;
  high_length = (flag >> 4) & 0b0111;
}

void __attribute__ ((noinline)) decode_all()
{
  static constexpr int64_t mult[] = {
    1, 1LL << 8, 1LL << 16, 1LL << 24, 1LL << 32, 1LL << 40, 1LL << 48, 1LL << 56
  };

  size_t idx = 0, num_idx = 0;
  for (size_t i = 0; i < n_msg; i += 16) {
    // first 8 bytes of a block are flags
    int signs[BLK_SIZE], lens[BLK_SIZE];
    for (int t = 0; t < BLK_SIZE; t += 2) {
      extract_flag(buffer[idx], signs[t], lens[t], signs[t + 1], lens[t + 1]);
      idx++;
    }
    for (int t = 0; t < BLK_SIZE; t++) if (lens[t] == 7) lens[t] = 8; // special case

    // decode BLK_SIZE values  
    for (int t = 0; t < BLK_SIZE; t++) {
      int64_t value = 0;
      for (int i = 0; i < lens[t]; i++) value += mult[i] * buffer[idx + i];
      if (signs[t]) value = -value;
      decoded[num_idx + t] = value;
      idx += lens[t];
    }
    num_idx += BLK_SIZE;
  }
}

void __attribute__ ((noinline)) gen_data()
{
  for (size_t i = 0; i < n_msg; i++) {
    if (rand() % 100 == 0) {
      original[i] = int64_t(rand()) * 1'000'000'000;
    } else {
      original[i] = rand() % 70'000;
    }
    if (rand() % 3 == 0) original[i] *= -1;
  }
}

void __attribute__ ((noinline)) check()
{
  for (size_t i = 0; i < n_msg; i++) if (original[i] != decoded[i]) {
    cout << "Wrong at " << i << " " << original[i] << " " << decoded[i] << "\n";
    exit(1);
  }
}

int64_t volatile dummy = 42;
constexpr int N_DUMMY = 8'000'000;
void __attribute__ ((noinline)) clear_cache()
{
  for (int i = 0; i < N_DUMMY; i++) dummy_array[i] = dummy;
  std::sort(dummy_array, dummy_array + N_DUMMY);
  dummy = dummy_array[rand() % N_DUMMY];
}

int main()
{  
  srand(time(NULL));
  MyTimer timer;
  buffer = new uint8_t[n_msg * 10];
  original = new int64_t[n_msg];
  decoded = new int64_t[n_msg];
  dummy_array = new int64_t[N_DUMMY]; // enough to flush cache
  
  memset(buffer, 0, n_msg * 10);
  memset(original, 0, n_msg * 8);
  memset(decoded, 0, n_msg * 8);
  memset(dummy_array, 0, N_DUMMY * 8);

  int n_test = 10;
  for (int t = 0; t < n_test; t++) {    
    gen_data();
    encode_all();
    clear_cache();

    timer.startCounter();
    decode_all();
    double cost = timer.getCounterMsPrecise();
    
    check();
    cout << t << " " << "cost = " << cost << "ms" << std::endl;
  }
}

编译命令:

g++ -o main test_encoding.cpp -std=c+=17 -O3 -mavx2

如果提供任何其他解决方案,我将更新基准: Ryzen 4350 G (Zen 2 x86-64) 的基准测试,解码 10^8 的时间

int64_t
:

method   | compressed/original size ratio | time cost (ms)
original | 0.341 | 575
c++ optimization encoding compression simd
2个回答
3
投票

这相对棘手,但仍然可以对解码器进行矢量化。这是一个在我的计算机上使用 AMD Zen3 CPU 的实现,其性能比您的参考版本高出 2.8 倍。

#include <immintrin.h>
#include <stdint.h>

// Exclusive prefix sum of unsigned bytes
// When the sum of all bytes exceeds 0xFF, the output is garbage
// Which is fine here because our bytes are in [0..8] interval
inline __m128i exclusivePrefixSum( const __m128i src, size_t& sum )
{
    __m128i v = src;
    // https://en.wikipedia.org/wiki/Prefix_sum#/media/File:Hillis-Steele_Prefix_Sum.svg
    v = _mm_add_epi8( v, _mm_slli_si128( v, 1 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 2 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 4 ) );
    v = _mm_add_epi8( v, _mm_slli_si128( v, 8 ) );

    // So far we have computed inclusive prefix sum
    // Keep the last byte which is total sum of bytes in the vector
    uint16_t tmp = _mm_extract_epi16( v, 7 );
    tmp >>= 8;
    sum = tmp;

    // Subtract original numbers to make exclusive sum = byte offsets to load
    return _mm_sub_epi8( v, src );
}

// Load 4 uint64 numbers from ( rsi + offsets[ i ] ), keep the lowest bits[ i ] bits in the numbers
inline __m256i loadAndMask( const uint8_t* rsi, uint32_t offsets, __m128i bits )
{
    // Load 4 uint64_t numbers from the correct locations, without AVX2 gathers
    const int64_t* s0 = (const int64_t*)( rsi + (uint8_t)offsets );
    const int64_t* s1 = (const int64_t*)( rsi + (uint8_t)( offsets >> 8 ) );
    const int64_t* s2 = (const int64_t*)( rsi + (uint8_t)( offsets >> 16 ) );
    const int64_t* s3 = (const int64_t*)( rsi + ( offsets >> 24 ) );
    __m256i r = _mm256_setr_epi64x( *s0, *s1, *s2, *s3 );

    // Mask away the higher pieces in the loaded numbers
    __m256i shift = _mm256_cvtepu8_epi64( bits );
    const __m256i ones = _mm256_set1_epi32( -1 );
    __m256i mask = _mm256_sllv_epi64( ones, shift );
    return _mm256_andnot_si256( mask, r );
}

inline __m256i applySigns( __m256i v, __m128i signs )
{
    // Sign extend the masks from bytes to int64
    __m256i mask = _mm256_cvtepi8_epi64( signs );
    // Conditionally negate the numbers
    __m256i neg = _mm256_sub_epi64( _mm256_setzero_si256(), v );
    return _mm256_blendv_epi8( v, neg, mask );
}

class BlockDecoder
{
    // Load offsets, in bytes
    __m128i offsetBytes;
    // Payload bits per element, the bytes are in [ 0 .. 64 ] interval
    __m128i payloadBits;
    // 16 bytes with the 0x80 bit set when the corresponding input was negative; the rest of the bits are unused
    __m128i signs;
    // Count of payload bytes in the complete block
    size_t payloadBytes;

    // Decode the block header
    inline void loadHeader( const uint8_t* rsi )
    {
        // Load 8 bytes, and zero extend them into uint16_t
        const __m128i v = _mm_cvtepu8_epi16( _mm_loadu_si64( rsi ) );

        // Unpack lengths
        const __m128i seven = _mm_set1_epi8( 7 );
        const __m128i l4 = _mm_slli_epi16( v, 4 );
        __m128i lengths = _mm_or_si128( v, l4 );
        lengths = _mm_and_si128( lengths, seven );
        // Transform 7 into 8
        __m128i tmp = _mm_cmpeq_epi8( lengths, seven );
        lengths = _mm_sub_epi8( lengths, tmp );

        // Byte offsets to load 16 numbers, and count of payload bytes in the complete block
        offsetBytes = exclusivePrefixSum( lengths, payloadBytes );
        // Count of payload bits in the loaded numbers, lengths * 8
        payloadBits = _mm_slli_epi16( lengths, 3 );
        // Signs vector, we only use the highest 0x80 bit in these bytes
        signs = _mm_or_si128( _mm_slli_epi16( v, 8 ), l4 );
    }

    // Decode and store 4 numbers
    template<int slice>
    inline void decodeSlice( int64_t* rdi, const uint8_t* payload ) const
    {
        uint32_t off;
        __m128i bits, s;
        if constexpr( slice != 0 )
        {
            off = (uint32_t)_mm_extract_epi32( offsetBytes, slice );
            constexpr int imm = _MM_SHUFFLE( slice, slice, slice, slice );
            bits = _mm_shuffle_epi32( payloadBits, imm );
            s = _mm_shuffle_epi32( signs, imm );
        }
        else
        {
            off = (uint32_t)_mm_cvtsi128_si32( offsetBytes );
            // For the first slice of the block, the 4 lowest bytes are in the correct locations already
            bits = payloadBits;
            s = signs;
        }

        __m256i v = loadAndMask( payload, off, bits );
        v = applySigns( v, s );
        _mm256_storeu_si256( ( __m256i* )rdi, v );
    }

public:

    // Decode and store a block of 16 numbers, return pointer to the next block
    const uint8_t* decode( int64_t* rdi, const uint8_t* rsi )
    {
        loadHeader( rsi );
        rsi += 8;

        decodeSlice<0>( rdi, rsi );
        decodeSlice<1>( rdi + 4, rsi );
        decodeSlice<2>( rdi + 8, rsi );
        decodeSlice<3>( rdi + 12, rsi );

        return rsi + payloadBytes;
    }
};

我对代码的测试很少,还有可能进一步提高性能。

如您所见,该实现需要 AVX2,并且是无分支的。

使用示例:

BlockDecoder dec;
const uint8_t* rsi = buffer;
int64_t* rdi = decoded;
for( size_t i = 0; i < n_msg; i += 16 )
{
    rsi = dec.decode( rdi, rsi );
    rdi += 16;
}

注意! 通常,当流中的最后一个数字未使用最大 8 个字节时,实现会加载编码缓冲区末尾之后的几个字节。您可以填充压缩数据,或者实现特殊情况来解码流的最终块,或者调整编码器以始终为最终块的最终编号发出 8 个字节。

附注初始版本使用

_mm256_i32gather_epi64
指令从内存加载四个 int64 数字,使用基指针 + 另一个向量的四个字节偏移量。然而,在 AMD CPU 上,它比 4 标量负载稍慢。如果您的目标是 Intel,可能最好使用初始版本,请参阅编辑历史记录。


2
投票

标量实现在很大程度上可以是无分支的。

从长远来看,用符号位填充控制位可能会花费一些位。 相反,这里所有控制位都打包在流的开头(类似于streamvbyte)。

使用变量符号扩展是因为对于字节标量实现来说,它比 zigzag 便宜。

概念证明假设:

  • 快速未对齐内存访问(并且 memcpy 进行优化)
  • 小端存储顺序
  • 算术右移
#include <cstddef>  // size_t
#include <cstdint> // uint64_t, int64_t
#include <immintrin.h> // lzcnt intrinsic
#include <cstring> // memcpy

unsigned char* encode(size_t n, const int64_t* src, unsigned char* dst) {
    unsigned char* key_ptr = dst;
    size_t num_chunks = n >> 3;
    unsigned char* data_ptr = &dst[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    while (num_chunks--) {
        uint64_t key = 0;
        for (size_t i = 0; i < 8; i++) {
            uint64_t v = (uint64_t)*src++;
            memcpy(data_ptr, &v, sizeof(v)); // assumes little endian

            v = (v + v) ^ v; // clear redundant sign bits (and trash everything else)
            v = (v << 8) | v; // combine lengths of 7 and 8
            v = (_lzcnt_u64(v | 1) ^ 63) >> 3; // use BSR on intel... 
            data_ptr += v + ((v + 1) >> 3); // key of 7 is length of 8
            key |= v << (i * 3);
        }
        memcpy(key_ptr, &key, 3); // assumes little endian
        key_ptr += 3;
    }
    // TODO: tail loop...
    
    return dst;
}
    
void decode(size_t n, const unsigned char* src, int64_t* dst) {
    const unsigned char* key_ptr = src;
    size_t num_chunks = n >> 3;
    const unsigned char* data_ptr = &src[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
    if (n >= 19) { // if has at least 8 bytes of padding before data_ptr
        data_ptr -= sizeof(uint64_t); // left aligned the truncated val in register (little endian) 
        while (num_chunks--) {
            unsigned keys;
            memcpy(&keys, key_ptr, 3);
            key_ptr += 3;
            for (size_t i = 0; i < 8; i++) {
                uint64_t v;
                size_t k = keys & 0x07;
                size_t len = k + ((k + 1) >> 3); // k==7 is len=8
                uint64_t mask = (uint64_t)0 - ((k + 7) >> 3); // k ? -1 : 0
                size_t shift = (64 - (len * 8)) & (size_t)mask; 

                keys >>= 3;     
                data_ptr += len; 
                memcpy(&v, data_ptr, sizeof(v));
                v &= mask;
                *dst++ = (int64_t)v >> shift;
            }
        }
        data_ptr += sizeof(uint64_t);
    }

    // TODO: tail loop...
}
© www.soinside.com 2019 - 2024. All rights reserved.