我有这种编码方法,其工作原理是将 16x
int64_t
的小块编码为 16 个标志半字节的小块,打包成 8 个字节,然后为每个输入 1 个或更多字节的有效负载 int64_t
:
sign(value)
存储在标志中abs(value)
存储在缓冲区中,字节数可变此示例基准测试经过简化。我已经用实际数据测试了我的用例:它比 LEB128(又名 varint)和 lz4 更快,而且压缩得更好。那么让我们重点关注这种编码方法或类似的方法。
如何改进这段代码? 我最关心的是解压速度。如果可以提高速度,可以对编码/解码格式(或其他格式)进行一些更改。我想知道在这种情况下是否可以使用 SIMD。
特别提示:
abs(value)
有长度 <= 2 bytes. In my actual use case, the data is mostlyabs(value)
可以用<= 7 bytes. Similarly, 99% of the time, 16 consecutive abs(value)
表示可以用<= 32 byte.encode
和 decode_all
#include <iostream>
#include <string>
#include <cstring>
#include <algorithm>
#include <vector>
#include <chrono>
using namespace std;
namespace {
class MyTimer {
std::chrono::time_point<std::chrono::system_clock> start;
public:
void startCounter() {
start = std::chrono::system_clock::now();
}
int64_t getCounterNs() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count();
}
int64_t getCounterMs() {
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count();
}
double getCounterMsPrecise() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count()
/ 1000000.0;
}
};
}
template <typename T>
void leb128_encode(uint8_t* data, T value, int &idx)
{
bool more = true;
bool negative = value < 0;
constexpr int size = sizeof(value) * 8;
while (more) {
uint8_t byte = value & 0x7F;
value >>= 7;
if (negative) value |= (~0LL << (size - 7));
if ((value == 0 && (byte & 0x40) == 0) || (value == -1 && (byte & 0x40) != 0)) {
more = false;
} else {
byte |= 0x80;
}
data[idx++] = byte;
}
}
template <typename T>
void leb128_decode(uint8_t* data, T &value, int &idx)
{
value = 0;
int shift = 0;
constexpr int size = sizeof(value) * 8;
uint8_t byte;
do {
byte = data[idx++];
value |= int64_t(byte & 0x7F) << shift;
shift += 7;
} while (byte & 0x80);
if (shift < size && (byte & 0x40)) {
value |= (~0LL << shift);
}
}
int n_msg = 100'000'000;
constexpr int BLK_SIZE = 16;
uint8_t* buffer;
int64_t *original;
int64_t *decoded;
int64_t *dummy_array;
// encode 1 value using variable length encoding,
// storing the result in a flag
void encode(int64_t value, uint8_t &flag, size_t &idx)
{
bool negative = value < 0;
value = abs(value);
uint8_t byte_cnt = 0;
while (value > 0) {
buffer[idx] = value & 0xFF; // lowest byte
idx++;
value >>= 8;
byte_cnt++;
}
// special cases. Since we only have 3 bits to represent 9 values (0->8),
// we choose to group case 7-8 together because they're rarest.
if (byte_cnt == 7) {
buffer[idx] = 0;
idx++;
} else if (byte_cnt == 8) {
// since we only have 3 bits, we use 7 to represent 8 bytes
byte_cnt = 7;
}
flag = byte_cnt; // bit 0-2 represent length
if (negative) flag |= 0b1000; // bit 3 represent sign (1 means negative)
}
void __attribute__ ((noinline)) encode_all()
{
size_t idx = 0;
// encode in blocks of 16 values
for (size_t i = 0; i < n_msg; i += 16) {
size_t flag_idx = idx;
idx += 8; // first 8 bytes of a block are for sign/length flags
for (int t = 0; t < BLK_SIZE; t += 2) {
uint8_t low_flag, high_flag;
encode(original[i + t], low_flag, idx);
encode(original[i + t + 1], high_flag, idx);
buffer[flag_idx + t / 2] = (high_flag << 4) | low_flag;
}
}
cout << "Compression ratio = " << (double(idx) / (n_msg * 8)) << "\n";
}
template <typename T>
void extract_flag(uint8_t flag, T &low_sign, T &low_length, T &high_sign, T &high_length)
{
low_sign = flag & 0b1000;
low_length = flag & 0b0111;
high_sign = (flag >> 4) & 0b1000;
high_length = (flag >> 4) & 0b0111;
}
void __attribute__ ((noinline)) decode_all()
{
static constexpr int64_t mult[] = {
1, 1LL << 8, 1LL << 16, 1LL << 24, 1LL << 32, 1LL << 40, 1LL << 48, 1LL << 56
};
size_t idx = 0, num_idx = 0;
for (size_t i = 0; i < n_msg; i += 16) {
// first 8 bytes of a block are flags
int signs[BLK_SIZE], lens[BLK_SIZE];
for (int t = 0; t < BLK_SIZE; t += 2) {
extract_flag(buffer[idx], signs[t], lens[t], signs[t + 1], lens[t + 1]);
idx++;
}
for (int t = 0; t < BLK_SIZE; t++) if (lens[t] == 7) lens[t] = 8; // special case
// decode BLK_SIZE values
for (int t = 0; t < BLK_SIZE; t++) {
int64_t value = 0;
for (int i = 0; i < lens[t]; i++) value += mult[i] * buffer[idx + i];
if (signs[t]) value = -value;
decoded[num_idx + t] = value;
idx += lens[t];
}
num_idx += BLK_SIZE;
}
}
void __attribute__ ((noinline)) gen_data()
{
for (size_t i = 0; i < n_msg; i++) {
if (rand() % 100 == 0) {
original[i] = int64_t(rand()) * 1'000'000'000;
} else {
original[i] = rand() % 70'000;
}
if (rand() % 3 == 0) original[i] *= -1;
}
}
void __attribute__ ((noinline)) check()
{
for (size_t i = 0; i < n_msg; i++) if (original[i] != decoded[i]) {
cout << "Wrong at " << i << " " << original[i] << " " << decoded[i] << "\n";
exit(1);
}
}
int64_t volatile dummy = 42;
constexpr int N_DUMMY = 8'000'000;
void __attribute__ ((noinline)) clear_cache()
{
for (int i = 0; i < N_DUMMY; i++) dummy_array[i] = dummy;
std::sort(dummy_array, dummy_array + N_DUMMY);
dummy = dummy_array[rand() % N_DUMMY];
}
int main()
{
srand(time(NULL));
MyTimer timer;
buffer = new uint8_t[n_msg * 10];
original = new int64_t[n_msg];
decoded = new int64_t[n_msg];
dummy_array = new int64_t[N_DUMMY]; // enough to flush cache
memset(buffer, 0, n_msg * 10);
memset(original, 0, n_msg * 8);
memset(decoded, 0, n_msg * 8);
memset(dummy_array, 0, N_DUMMY * 8);
int n_test = 10;
for (int t = 0; t < n_test; t++) {
gen_data();
encode_all();
clear_cache();
timer.startCounter();
decode_all();
double cost = timer.getCounterMsPrecise();
check();
cout << t << " " << "cost = " << cost << "ms" << std::endl;
}
}
编译命令:
g++ -o main test_encoding.cpp -std=c+=17 -O3 -mavx2
如果提供任何其他解决方案,我将更新基准: Ryzen 4350 G (Zen 2 x86-64) 的基准测试,解码 10^8 的时间
int64_t
:
method | compressed/original size ratio | time cost (ms)
original | 0.341 | 575
这相对棘手,但仍然可以对解码器进行矢量化。这是一个在我的计算机上使用 AMD Zen3 CPU 的实现,其性能比您的参考版本高出 2.8 倍。
#include <immintrin.h>
#include <stdint.h>
// Exclusive prefix sum of unsigned bytes
// When the sum of all bytes exceeds 0xFF, the output is garbage
// Which is fine here because our bytes are in [0..8] interval
inline __m128i exclusivePrefixSum( const __m128i src, size_t& sum )
{
__m128i v = src;
// https://en.wikipedia.org/wiki/Prefix_sum#/media/File:Hillis-Steele_Prefix_Sum.svg
v = _mm_add_epi8( v, _mm_slli_si128( v, 1 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 2 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 4 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 8 ) );
// So far we have computed inclusive prefix sum
// Keep the last byte which is total sum of bytes in the vector
uint16_t tmp = _mm_extract_epi16( v, 7 );
tmp >>= 8;
sum = tmp;
// Subtract original numbers to make exclusive sum = byte offsets to load
return _mm_sub_epi8( v, src );
}
// Load 4 uint64 numbers from ( rsi + offsets[ i ] ), keep the lowest bits[ i ] bits in the numbers
inline __m256i loadAndMask( const uint8_t* rsi, uint32_t offsets, __m128i bits )
{
// Load 4 uint64_t numbers from the correct locations, without AVX2 gathers
const int64_t* s0 = (const int64_t*)( rsi + (uint8_t)offsets );
const int64_t* s1 = (const int64_t*)( rsi + (uint8_t)( offsets >> 8 ) );
const int64_t* s2 = (const int64_t*)( rsi + (uint8_t)( offsets >> 16 ) );
const int64_t* s3 = (const int64_t*)( rsi + ( offsets >> 24 ) );
__m256i r = _mm256_setr_epi64x( *s0, *s1, *s2, *s3 );
// Mask away the higher pieces in the loaded numbers
__m256i shift = _mm256_cvtepu8_epi64( bits );
const __m256i ones = _mm256_set1_epi32( -1 );
__m256i mask = _mm256_sllv_epi64( ones, shift );
return _mm256_andnot_si256( mask, r );
}
inline __m256i applySigns( __m256i v, __m128i signs )
{
// Sign extend the masks from bytes to int64
__m256i mask = _mm256_cvtepi8_epi64( signs );
// Conditionally negate the numbers
__m256i neg = _mm256_sub_epi64( _mm256_setzero_si256(), v );
return _mm256_blendv_epi8( v, neg, mask );
}
class BlockDecoder
{
// Load offsets, in bytes
__m128i offsetBytes;
// Payload bits per element, the bytes are in [ 0 .. 64 ] interval
__m128i payloadBits;
// 16 bytes with the 0x80 bit set when the corresponding input was negative; the rest of the bits are unused
__m128i signs;
// Count of payload bytes in the complete block
size_t payloadBytes;
// Decode the block header
inline void loadHeader( const uint8_t* rsi )
{
// Load 8 bytes, and zero extend them into uint16_t
const __m128i v = _mm_cvtepu8_epi16( _mm_loadu_si64( rsi ) );
// Unpack lengths
const __m128i seven = _mm_set1_epi8( 7 );
const __m128i l4 = _mm_slli_epi16( v, 4 );
__m128i lengths = _mm_or_si128( v, l4 );
lengths = _mm_and_si128( lengths, seven );
// Transform 7 into 8
__m128i tmp = _mm_cmpeq_epi8( lengths, seven );
lengths = _mm_sub_epi8( lengths, tmp );
// Byte offsets to load 16 numbers, and count of payload bytes in the complete block
offsetBytes = exclusivePrefixSum( lengths, payloadBytes );
// Count of payload bits in the loaded numbers, lengths * 8
payloadBits = _mm_slli_epi16( lengths, 3 );
// Signs vector, we only use the highest 0x80 bit in these bytes
signs = _mm_or_si128( _mm_slli_epi16( v, 8 ), l4 );
}
// Decode and store 4 numbers
template<int slice>
inline void decodeSlice( int64_t* rdi, const uint8_t* payload ) const
{
uint32_t off;
__m128i bits, s;
if constexpr( slice != 0 )
{
off = (uint32_t)_mm_extract_epi32( offsetBytes, slice );
constexpr int imm = _MM_SHUFFLE( slice, slice, slice, slice );
bits = _mm_shuffle_epi32( payloadBits, imm );
s = _mm_shuffle_epi32( signs, imm );
}
else
{
off = (uint32_t)_mm_cvtsi128_si32( offsetBytes );
// For the first slice of the block, the 4 lowest bytes are in the correct locations already
bits = payloadBits;
s = signs;
}
__m256i v = loadAndMask( payload, off, bits );
v = applySigns( v, s );
_mm256_storeu_si256( ( __m256i* )rdi, v );
}
public:
// Decode and store a block of 16 numbers, return pointer to the next block
const uint8_t* decode( int64_t* rdi, const uint8_t* rsi )
{
loadHeader( rsi );
rsi += 8;
decodeSlice<0>( rdi, rsi );
decodeSlice<1>( rdi + 4, rsi );
decodeSlice<2>( rdi + 8, rsi );
decodeSlice<3>( rdi + 12, rsi );
return rsi + payloadBytes;
}
};
我对代码的测试很少,还有可能进一步提高性能。
如您所见,该实现需要 AVX2,并且是无分支的。
使用示例:
BlockDecoder dec;
const uint8_t* rsi = buffer;
int64_t* rdi = decoded;
for( size_t i = 0; i < n_msg; i += 16 )
{
rsi = dec.decode( rdi, rsi );
rdi += 16;
}
注意! 通常,当流中的最后一个数字未使用最大 8 个字节时,实现会加载编码缓冲区末尾之后的几个字节。您可以填充压缩数据,或者实现特殊情况来解码流的最终块,或者调整编码器以始终为最终块的最终编号发出 8 个字节。
附注初始版本使用
_mm256_i32gather_epi64
指令从内存加载四个 int64 数字,使用基指针 + 另一个向量的四个字节偏移量。然而,在 AMD CPU 上,它比 4 标量负载稍慢。如果您的目标是 Intel,可能最好使用初始版本,请参阅编辑历史记录。
标量实现在很大程度上可以是无分支的。
从长远来看,用符号位填充控制位可能会花费一些位。 相反,这里所有控制位都打包在流的开头(类似于streamvbyte)。
使用变量符号扩展是因为对于字节标量实现来说,它比 zigzag 便宜。
概念证明假设:
#include <cstddef> // size_t
#include <cstdint> // uint64_t, int64_t
#include <immintrin.h> // lzcnt intrinsic
#include <cstring> // memcpy
unsigned char* encode(size_t n, const int64_t* src, unsigned char* dst) {
unsigned char* key_ptr = dst;
size_t num_chunks = n >> 3;
unsigned char* data_ptr = &dst[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
while (num_chunks--) {
uint64_t key = 0;
for (size_t i = 0; i < 8; i++) {
uint64_t v = (uint64_t)*src++;
memcpy(data_ptr, &v, sizeof(v)); // assumes little endian
v = (v + v) ^ v; // clear redundant sign bits (and trash everything else)
v = (v << 8) | v; // combine lengths of 7 and 8
v = (_lzcnt_u64(v | 1) ^ 63) >> 3; // use BSR on intel...
data_ptr += v + ((v + 1) >> 3); // key of 7 is length of 8
key |= v << (i * 3);
}
memcpy(key_ptr, &key, 3); // assumes little endian
key_ptr += 3;
}
// TODO: tail loop...
return dst;
}
void decode(size_t n, const unsigned char* src, int64_t* dst) {
const unsigned char* key_ptr = src;
size_t num_chunks = n >> 3;
const unsigned char* data_ptr = &src[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
if (n >= 19) { // if has at least 8 bytes of padding before data_ptr
data_ptr -= sizeof(uint64_t); // left aligned the truncated val in register (little endian)
while (num_chunks--) {
unsigned keys;
memcpy(&keys, key_ptr, 3);
key_ptr += 3;
for (size_t i = 0; i < 8; i++) {
uint64_t v;
size_t k = keys & 0x07;
size_t len = k + ((k + 1) >> 3); // k==7 is len=8
uint64_t mask = (uint64_t)0 - ((k + 7) >> 3); // k ? -1 : 0
size_t shift = (64 - (len * 8)) & (size_t)mask;
keys >>= 3;
data_ptr += len;
memcpy(&v, data_ptr, sizeof(v));
v &= mask;
*dst++ = (int64_t)v >> shift;
}
}
data_ptr += sizeof(uint64_t);
}
// TODO: tail loop...
}