基本上,我有一个向量数组,我想将其“拆分”为多个具有向量值的数组,但我正在努力寻找最佳方法来执行此操作,因为这是针对性能关键场景的。
Vector256<double>[] input = GetVectors();
double[] X, Y, Z, W;
// how do I fill X, Y, Z, W with their respective values in a performant way?
我已经尝试过 Linq 和 Parallel.For,Linq 很慢并且分配,Parallel.For 快两倍但仍然分配,我觉得它不够快。
我知道数组分配,我没有计算数组分配,而是计算 Linq 和 Parallel.For 本身带来的分配,因此您可以从分配的内存中获取 sizeof(double) * 计数,并且仍然看到它们仍然分配很多。
当 Count = 1000 时,数组的大小加上数组本身的点和头应该是 31.25kb,但 linq 仍然分配 ~0.34kb 和 Parallel.For 分配 ~2.33kb,更高的计数 = 更高的分配,我有点想要尽可能减少,因为数组可以进行内存池化,而 Linq 和 Parallel.For 则不能。
[Benchmark]
public double[][] Linq()
{
var result = new double[4][];
result[0] = _array.Select(x => x.X).ToArray();
result[1] = _array.Select(x => x.Y).ToArray();
result[2] = _array.Select(x => x.Z).ToArray();
result[3] = _array.Select(x => x.W).ToArray();
return result;
}
[Benchmark(Baseline = true)]
public double[][] ParallelFor()
{
var result = new double[][]
{
new double[Count],
new double[Count],
new double[Count],
new double[Count]
};
Parallel.For(0, Count, i =>
{
result[0][i] = _array[i].X;
result[1][i] = _array[i].Y;
result[2][i] = _array[i].Z;
result[3][i] = _array[i].W;
});
return result;
}
方法 | 数 | 意思是 | 错误 | 标准偏差 | 比率 | 比率SD | 0代 | 第一代 | 第二代 | 已分配 | 分配比例 |
---|---|---|---|---|---|---|---|---|---|---|---|
Linq | 1000 | 13.259 我们 | 0.1522 我们 | 0.1271 我们 | 1.47 | 0.02 | 10.2997 | - | - | 31.59 KB | 0.94 |
并行 | 1000 | 8.993 我们 | 0.0837 我们 | 0.0699 我们 | 1.00 | 0.00 | 11.1389 | - | - | 33.58 KB | 1.00 |
Linq | 10000 | 136.482 我们 | 2.7235 我们 | 3.8179 我们 | 2.94 | 0.09 | 67.3828 | 33.4473 | - | 312.84 KB | 0.99 |
并行 | 10000 | 46.989 我们 | 0.6139 我们 | 0.5742 我们 | 1.00 | 0.00 | 75.0732 | 36.0718 | - | 314.83 KB | 1.00 |
Linq | 100000 | 1,877.981 我们 | 18.9144 我们 | 15.7944 我们 | 1.92 | 0.11 | 562.5000 | 562.5000 | 562.5000 | 3125.51 KB | 1.00 |
并行 | 100000 | 992.898 我们 | 19.7800 我们 | 50.7037 我们 | 1.00 | 0.00 | 260.7422 | 256.8359 | 256.8359 | 3130.72 KB | 1.00 |
我有点期待一些使用非托管内存的不安全技巧或一些优化它的东西,或者一些 SIMD 巫术。
因为您似乎想要性能,并且愿意使用不安全的代码。尝试以下 AVX 版本。
using System.Runtime.CompilerServices;
using System.Runtime.Intrinsics;
using System.Runtime.Intrinsics.X86;
static class SplitLanes
{
public static double[][] splitLanes( Vector256<double>[] source )
{
int length = source.Length;
if( length <= 0 )
throw new ArgumentException();
var result = new double[ 4 ][]
{
new double[length],
new double[length],
new double[length],
new double[length]
};
unsafe
{
fixed( Vector256<double>* sourceF = source )
fixed( double* xf = result[ 0 ] )
fixed( double* yf = result[ 1 ] )
fixed( double* zf = result[ 2 ] )
fixed( double* wf = result[ 3 ] )
{
// Copy source pointers
// For some weird reason, the pointers returned from fixed statement are immutable.
Vector256<double>* rsi = sourceF;
double* x = xf;
double* y = yf;
double* z = zf;
double* w = wf;
Vector256<double>* rsiEndAligned = rsi + ( length & -4 );
Vector256<double>* rsiEnd = rsi + length;
// Handle majority of the data with AVX
// Each iteration of this loop loads 4 vectors = 16 elements,
// and stores a full vector into each output pointer
for( ; rsi < rsiEndAligned; rsi += 4, x += 4, y += 4, z += 4, w += 4 )
transpose4x4( rsi, x, y, z, w );
// Handle the remainder
// Each iteration of this loop loads 4 elements,
// and stores a single number into each output pointer
for( ; rsi < rsiEnd; rsi++, x++, y++, z++, w++ )
transpose4x1( rsi, x, y, z, w );
}
}
return result;
}
/// <summary>Create AVX vector by loading 2 FP64 numbers from 2 pointers each</summary>
[MethodImpl( MethodImplOptions.AggressiveInlining )]
static unsafe Vector256<double> load2( double* a, double* b )
{
Vector128<double> low = Vector128.Load( a );
Vector256<double> result = Vector128.ToVector256Unsafe( low );
Vector128<double> high = Vector128.Load( b );
return Avx.InsertVector128( result, high, 1 );
}
/// <summary>Load 16 numbers from source pointer, transpose, and store 4 rows</summary>
[MethodImpl( MethodImplOptions.AggressiveInlining )]
static unsafe void transpose4x4( Vector256<double>* source,
double* x, double* y, double* z, double* w )
{
double* rsi = (double*)source;
// x0, y0, x2, z2
Vector256<double> a = load2( rsi, rsi + 8 );
// z0, w0, z2, w2
Vector256<double> b = load2( rsi + 2, rsi + 10 );
// x1, y1, x3, y3
Vector256<double> c = load2( rsi + 4, rsi + 12 );
// z1, w1, z4, w4
Vector256<double> d = load2( rsi + 6, rsi + 14 );
Vector256<double> v;
v = Avx.UnpackLow( a, c );
v.Store( x );
v = Avx.UnpackHigh( a, c );
v.Store( y );
v = Avx.UnpackLow( b, d );
v.Store( z );
v = Avx.UnpackHigh( b, d );
v.Store( w );
}
/// <summary>Load 4 numbers from the source pointer, and store 4 lanes</summary>
[MethodImpl( MethodImplOptions.AggressiveInlining )]
static unsafe void transpose4x1( Vector256<double>* source,
double* x, double* y, double* z, double* w )
{
double* rsi = (double*)source;
Vector128<double> v;
v = Vector128.Load( rsi );
Sse2.StoreScalar( x, v );
Sse2.StoreHigh( y, v );
v = Vector128.Load( rsi + 2 );
Sse2.StoreScalar( z, v );
Sse2.StoreHigh( w, v );
}
}