将 64 位整数的范围 [from, to] 转换为伪随机顺序的最快方法,在所有平台上都具有相同的结果?

问题描述 投票:0回答:2

给定一些区间 [a, b] 的索引(64 位无符号整数),我想快速获得一个包含所有这些索引的数组,这些索引根据均匀分布的哈希函数排序,看起来是随机的,但实际上是相同的每个系统,无论使用的 C++ 实现如何。

有点像

vector<uint64_t> distributeIndices(uint64_t from, uint64_t to) {
    unordered_set<uint64_t> uset;
    for (uint64_t i = from; i <= to; i++)
        uset.insert(i);
    return vector<uint64_t>(uset.begin(), uset.end());
}
如果

unordered_set<uint64_t>

 总是在每个实现上使用相同的伪随机分布的哈希函数,则 
会产生期望的结果,但两者都不是这种情况。而且它可能也不是最有效的解决方案(如果我错了请纠正我)。
请注意,
distributeIndices(from, to)
应返回 {from, ..., to} 的随机排列。

因此,答案应该提供一个特定的哈希函数(以及为什么它快速且均匀分布),以及如何有效地利用它来计算

distributeIndices(from, to)
.

再次重申,

distributeIndices(from, to)
无论在何处运行,使用何种编译器,都具有相同的结果,这是至关重要的,这一点必须根据 C++ 标准得到保证。 但是,如果例如
distributeIndices(0,2)
1
分配的索引与
distributeIndices(0,3)
分配的索引不同。

该函数应该在包含数十亿个索引的范围内表现良好。

[ 如果您对为什么这有用感到好奇: 考虑到不同计算节点上有不同的进程,通过消息传递接口进行通信,它们不应该发送实际数据(很大),而只发送它们正在处理的数据条目的索引。同时,处理数据的顺序应该是伪随机的,这样进度速度就不会“弹跳”太多(当沿着有序索引处理时,它确实如此)。这对于可靠预测整个计算将花费多长时间至关重要。因此每个节点都必须知道哪个转换后的索引引用哪个实际索引,即每个节点都必须为

distributeIndices(from, to)
计算相同的结果。 ]

我实际上使用 tbb::concurrent_vector 而不是 std::vector,因此通过 tbb::parallel_for 使用共享内存并行化在执行速度更快同时以某种方式保持哈希函数提供的顺序时是一个优势。用法:

tbb::concurrent_vector<uint64_t> distributeIndices(uint64_t from, uint64_t to) {
    tbb::concurrent_unordered_set<uint64_t> uset;
    tbb::parallel_for(uint64_t(from), uint64_t(to + 1), [&uset](uint64_t i) {
        uset.insert(i);
    });
    return tbb::concurrent_vector<uint64_t>(uset.begin(), uset.end());
}

最快的正确工作解决方案赢得公认的答案。

  • 没有 C/C++ 代码,没有可接受的答案。

我将在我的旧 i7-3610QM 笔记本电脑上使用 GCC 11.3

-O3
测试解决方案,在 1 亿个索引上有 8 个硬件线程(即
distributeIndices(c, c + 99999999)
),并且当未来的答案提供更好的解决方案时,可能会更改已接受的答案。

测试代码(最多运行 10 次,选择最快执行):

int main(int argc, char* argv[]) {
    uint64_t c = atoll(argv[1]);
    uint64_t s = atoll(argv[2]); // 99999999
    for (unsigned i = 0; i < 10; i++) {
        chrono::time_point<chrono::system_clock> startTime = chrono::system_clock::now();
        auto indices = distributeIndices(c, c + s);
        chrono::microseconds dur = chrono::duration_cast<chrono::microseconds>(chrono::system_clock::now() - startTime);
        cout << durationStringMs(dur) << endl;
        // [... some checks ...]
    }
    return 0;
}
  • 结果的存储(例如通过静态变量)显然是不允许的。
  • uint64_t from
    uint64_t to
    不能被认为
    constexpr
    .

我的两个(不合适的)例子是

14482.83 ms (14 s 482.83 ms)
186812.68 ms (3 min 6 s 812.68 ms)

第二种方法看起来非常慢,但仔细检查后,它是我系统上唯一真正伪随机分布值的方法:

  • unordered_set<uint64_t>
    变体:
    99999999, 99999998, 99999997, 99999996, 99999995, ... // 坏
  • tbb::concurrent_unordered_set<uint64_t>
    变体:
    0, 67108864, 33554432, 16777216, 83886080, ... // 好

更好的方法是不必创建向量,而只是以伪随机顺序迭代索引,但这将要求每个伪随机索引都可以从其实际索引中有效地计算(无需迭代之前的所有元素它)。如果这是可能的,我会感到惊讶,但我很乐意感到惊讶。这种方法总是被认为比向量构建方法更好,并且通过迭代 1 亿个索引的持续时间来相互比较。

c++ optimization hash tbb uniform-distribution
2个回答
1
投票

简单的想法:用增量索引填充数组,然后 - 使用自己的、独立于系统的 rand-generator 对其进行洗牌。 您没有请求加密安全排列,在这里我在这个例子中只使用了简单的 LCG。如果你需要保证你的 shuffle 加密安全,我建议你使用 RC4,它在安全性和性能之间取得了很好的平衡。

#include <vector>
#include <algorithm>
#include <stdint.h>
#include <stdio.h>

// LCG params from: https://nuclear.llnl.gov/CNP/rng/rngman/node4.html
std::vector<uint64_t> distributeIndices(uint64_t lo, uint64_t hi) {
    uint64_t size = hi - lo + 1;
    std::vector<uint64_t> vec(size);
    for(uint64_t i = 0; i < size; i++)
        vec[i] = i + lo;
    uint64_t rnd = size ^ 0xBabeCafeFeedDad;
    for(uint64_t i = 0; i < size; i++) {
        rnd = rnd * 2862933555777941757ULL + 3037000493;
        uint64_t j = rnd % size;
        uint64_t tmp = vec[i]; vec[i] = vec[j]; vec[j] = tmp;
    }
    return std::move(vec);
}

int main(int argc, char **argv) {
    uint64_t lo = atoll(argv[1]);
    uint64_t hi = atoll(argv[2]);
   std::vector<uint64_t> vec = distributeIndices(lo, hi);
   for (uint64_t x : vec)
        printf("%lu\n", x);
   return 0;
}

0
投票

最简单的方法是使用 LCG 整个 [0...264) 到自身的唯一映射对其进行排序,如果 LCG 参数受 Hull-Dobell 定理条件约束。好的光谱参数取自 https://arxiv.org/pdf/2001.05304.pdf.

您可以轻松地将其调整为 TBB 向量和并行排序。

沿线

#include <algorithm>
#include <cstdint>
#include <iostream>
#include <numeric>
#include <vector>

#define func auto    

uint64_t m = 0xd1342543de82ef95ULL;
uint64_t c = 0x1ULL;

inline func lcg(uint64_t xi) -> uint64_t { // as LCG as it gets
    return m*xi + c;
}

inline func cmp_lcg(uint64_t a, uint64_t b) -> bool {
    return lcg(a) < lcg(b);
}

func distributeIndices(uint64_t from, uint64_t to) -> std::vector<uint64_t> {
    std::vector<uint64_t> z(to - from + 1);

    std::iota(z.begin(), z.end(), from);

    std::sort(z.begin(), z.end(), cmp_lcg);

    return z;
}

static const char NL = '\n';

func main() -> int {

    auto q = distributeIndices(7, 23);

    for(auto v: q)
        std::cout << v << " " << lcg(v) << NL;
    std::cout << NL;

    return 0;
}
© www.soinside.com 2019 - 2024. All rights reserved.