如何用 SIMD 向量最佳地填充多个数组?

问题描述 投票:0回答:1

基本上,我有一个向量数组,我想将其“拆分”为多个具有向量值的数组,但我正在努力寻找最佳方法来执行此操作,因为这是针对性能关键场景的。

Vector256<double>[] input = GetVectors();
double[] X, Y, Z, W;

// how do I fill X, Y, Z, W with their respective values in a performant way?

我已经尝试过 Linq 和 Parallel.For,Linq 很慢并且分配,Parallel.For 快两倍但仍然分配,我觉得它不够快。

我知道数组分配,我没有计算数组分配,而是计算 Linq 和 Parallel.For 本身带来的分配,因此您可以从分配的内存中获取 sizeof(double) * 计数,并且仍然看到它们仍然分配很多。

当 Count = 1000 时,数组的大小加上数组本身的点和头应该是 31.25kb,但 linq 仍然分配 ~0.34kb 和 Parallel.For 分配 ~2.33kb,更高的计数 = 更高的分配,我有点想要尽可能减少,因为数组可以进行内存池化,而 Linq 和 Parallel.For 则不能。

[Benchmark]
public double[][] Linq()
{
    var result = new double[4][];

    result[0] = _array.Select(x => x.X).ToArray();
    result[1] = _array.Select(x => x.Y).ToArray();
    result[2] = _array.Select(x => x.Z).ToArray();
    result[3] = _array.Select(x => x.W).ToArray();

    return result;
}

[Benchmark(Baseline = true)]
public double[][] ParallelFor()
{
    var result = new double[][] 
    { 
        new double[Count], 
        new double[Count], 
        new double[Count],
        new double[Count]
    };

    Parallel.For(0, Count, i =>
    {
        result[0][i] = _array[i].X;
        result[1][i] = _array[i].Y;
        result[2][i] = _array[i].Z;
        result[3][i] = _array[i].W;
    });

    return result;
}
方法 意思是 错误 标准偏差 比率 比率SD 0代 第一代 第二代 已分配 分配比例
Linq 1000 13.259 我们 0.1522 我们 0.1271 我们 1.47 0.02 10.2997 - - 31.59 KB 0.94
并行 1000 8.993 我们 0.0837 我们 0.0699 我们 1.00 0.00 11.1389 - - 33.58 KB 1.00
Linq 10000 136.482 我们 2.7235 我们 3.8179 我们 2.94 0.09 67.3828 33.4473 - 312.84 KB 0.99
并行 10000 46.989 我们 0.6139 我们 0.5742 我们 1.00 0.00 75.0732 36.0718 - 314.83 KB 1.00
Linq 100000 1,877.981 我们 18.9144 我们 15.7944 我们 1.92 0.11 562.5000 562.5000 562.5000 3125.51 KB 1.00
并行 100000 992.898 我们 19.7800 我们 50.7037 我们 1.00 0.00 260.7422 256.8359 256.8359 3130.72 KB 1.00

我有点期待一些使用非托管内存的不安全技巧或一些优化它的东西,或者一些 SIMD 巫术。

.net-core optimization vectorization simd
1个回答
0
投票

因为您似乎想要性能,并且愿意使用不安全的代码。尝试以下 AVX 版本。

using System.Runtime.CompilerServices;
using System.Runtime.Intrinsics;
using System.Runtime.Intrinsics.X86;

static class SplitLanes
{
    public static double[][] splitLanes( Vector256<double>[] source )
    {
        int length = source.Length;
        if( length <= 0 )
            throw new ArgumentException();

        var result = new double[ 4 ][]
        {
            new double[length],
            new double[length],
            new double[length],
            new double[length]
        };

        unsafe
        {
            fixed( Vector256<double>* sourceF = source )
            fixed( double* xf = result[ 0 ] )
            fixed( double* yf = result[ 1 ] )
            fixed( double* zf = result[ 2 ] )
            fixed( double* wf = result[ 3 ] )
            {
                // Copy source pointers
                // For some weird reason, the pointers returned from fixed statement are immutable. 
                Vector256<double>* rsi = sourceF;
                double* x = xf;
                double* y = yf;
                double* z = zf;
                double* w = wf;
                Vector256<double>* rsiEndAligned = rsi + ( length & -4 );
                Vector256<double>* rsiEnd = rsi + length;

                // Handle majority of the data with AVX
                // Each iteration of this loop loads 4 vectors = 16 elements,
                // and stores a full vector into each output pointer
                for( ; rsi < rsiEndAligned; rsi += 4, x += 4, y += 4, z += 4, w += 4 )
                    transpose4x4( rsi, x, y, z, w );

                // Handle the remainder
                // Each iteration of this loop loads 4 elements,
                // and stores a single number into each output pointer
                for( ; rsi < rsiEnd; rsi++, x++, y++, z++, w++ )
                    transpose4x1( rsi, x, y, z, w );
            }
        }
        return result;
    }

    /// <summary>Create AVX vector by loading 2 FP64 numbers from 2 pointers each</summary>
    [MethodImpl( MethodImplOptions.AggressiveInlining )]
    static unsafe Vector256<double> load2( double* a, double* b )
    {
        Vector128<double> low = Vector128.Load( a );
        Vector256<double> result = Vector128.ToVector256Unsafe( low );
        Vector128<double> high = Vector128.Load( b );
        return Avx.InsertVector128( result, high, 1 );
    }

    /// <summary>Load 16 numbers from source pointer, transpose, and store 4 rows</summary>
    [MethodImpl( MethodImplOptions.AggressiveInlining )]
    static unsafe void transpose4x4( Vector256<double>* source,
        double* x, double* y, double* z, double* w )
    {
        double* rsi = (double*)source;
        // x0, y0, x2, z2
        Vector256<double> a = load2( rsi, rsi + 8 );
        // z0, w0, z2, w2
        Vector256<double> b = load2( rsi + 2, rsi + 10 );
        // x1, y1, x3, y3
        Vector256<double> c = load2( rsi + 4, rsi + 12 );
        // z1, w1, z4, w4 
        Vector256<double> d = load2( rsi + 6, rsi + 14 );

        Vector256<double> v;
        v = Avx.UnpackLow( a, c );
        v.Store( x );
        v = Avx.UnpackHigh( a, c );
        v.Store( y );
        v = Avx.UnpackLow( b, d );
        v.Store( z );
        v = Avx.UnpackHigh( b, d );
        v.Store( w );
    }

    /// <summary>Load 4 numbers from the source pointer, and store 4 lanes</summary>
    [MethodImpl( MethodImplOptions.AggressiveInlining )]
    static unsafe void transpose4x1( Vector256<double>* source,
        double* x, double* y, double* z, double* w )
    {
        double* rsi = (double*)source;
        Vector128<double> v;
        v = Vector128.Load( rsi );
        Sse2.StoreScalar( x, v );
        Sse2.StoreHigh( y, v );
        v = Vector128.Load( rsi + 2 );
        Sse2.StoreScalar( z, v );
        Sse2.StoreHigh( w, v );
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.