为什么在Spark 布隆过滤器中观察到的误报率高于预期?

问题描述 投票:0回答:1

在 Spark 中使用大量元素(>4 亿左右)创建一个 ffp(误报率)为 1% 的布隆过滤器时,观察到的误报率似乎要高得多,高达 20% .

我不认为这是布隆过滤器的已知限制。至少我在布隆过滤器数据结构的各种描述中没有看到类似的内容。那么这是 Spark 实现中的错误/限制吗?

请参阅下面我的 Spark shell 进行演示:

      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.0-amzn-0
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 17.0.10)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import java.security.MessageDigest
import java.security.MessageDigest

scala> import scala.util.Random
import scala.util.Random

scala> import org.apache.spark.util.sketch.BloomFilter
import org.apache.spark.util.sketch.BloomFilter

scala> 

scala> // Function to generate a random SHA1 hash

scala> def generateRandomSha1(): String = {
     |   val randomString = Random.alphanumeric.take(20).mkString
     |   val sha1 = MessageDigest.getInstance("SHA-1")
     |   sha1.update(randomString.getBytes("UTF-8"))
     |   val digest = sha1.digest
     |   digest.map("%02x".format(_)).mkString
     | }
generateRandomSha1: ()String

scala> 

scala> // Generate a DataFrame with 500 million rows of random SHA1 hashes

scala> val df = spark.range(500000000).map(_ => generateRandomSha1()).toDF("Hash")
df: org.apache.spark.sql.DataFrame = [Hash: string]

scala> // Create a bloom filter out of this collection of strings.

scala> val bloom_filter = df.stat.bloomFilter("Hash", 500000000, 0.01)
bloom_filter: org.apache.spark.util.sketch.BloomFilter = org.apache.spark.util.sketch.BloomFilterImpl@a14c0ba9

scala> // Generate another 10,000 random hashes

scala> val random_sha1s = List.fill(10000)(generateRandomSha1())
random_sha1s: List[String] = List(f3cbfd9bd836ea917ebc0dfc5330135cfde322a3, 4bff8d58799e517a1ba78236db9b52353dd39b56, 775bdd9d138a79eeae7308617f5c0d1d0e1c1697, abbd761b7768f3cbadbffc0c7947185856c4943d, 343692fe61c552f73ad6bc2d2d3072cc456da1db, faf4430055c528c9a00a46e9fae7dc25047ffaf3, 255b5d56c39bfba861647fff67704e6bc758d683, dae8e0910a368f034958ae232aa5f5285486a8ac, 3680dbd34437ca661592a7e4d39782c9c77fb4ba, f5b43f7a77c9d9ea28101a1848d8b1a1c0a65b82, 5bda825102026bc0da731dc84d56a499ccff0fe1, 158d7b3ce949422de421d5e110e3f6903af4f8e1, 2efcae5cb10273a0f5e89ae34fa3156238ab0555, 8d241012d42097f80f30e8ead227d75ab77086d2, 307495c98ae5f25026b91e60cf51d4f9f1ad7f4b, 8fc2f55563ab67d4ec87ff7b04a4a01e821814a3, b413572d14ee16c6c575ca3472adff62a8cbfa3d, 9219233b0e8afe57d7d5cb6...

scala> // Check how many of these random hashes return a positive result when passed into mightContain

scala> random_sha1s.map(c => bloom_filter.mightContain(c)).count(_ == true)
res0: Int = 2153

我预计这里会有大约 100 个阳性结果,即 1%,但实际比率超过 20% (2,153/10,000)。对于较小的布隆过滤器(例如,大约 100M 记录),速率更接近预期值。为什么会这样呢?过滤器的精度会随着规模的增加而恶化吗?

scala apache-spark bloom-filter
1个回答
0
投票

我明白了问题所在。这似乎是 Spark 中的一个错误。

下面是将新值放入集合中的函数:

  @Override
  public boolean putBinary(byte[] item) {
    int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
    int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);

    long bitSize = bits.bitSize();
    boolean bitsChanged = false;
    for (int i = 1; i <= numHashFunctions; i++) {
      int combinedHash = h1 + (i * h2);
      // Flip all the bits if it's negative (guaranteed positive number)
      if (combinedHash < 0) {
        combinedHash = ~combinedHash;
      }
      bitsChanged |= bits.set(combinedHash % bitSize);
    }
    return bitsChanged;
  }

使用的哈希值返回一个整数,负整数会翻转为正数,这意味着实际上所有哈希值都将是 0 到 Integer.MAX_VALUE(约 21.4 亿)之间的整数。这意味着即使位数组中的总位数大于此数字,也只会设置前约 21.4 亿位。这种可用位数的上限会导致 Bloom 过滤器的性能日益下降,其中 m 明显更大(当 n 大于 2.5 亿左右时就会出现这种情况)。

这个布隆过滤器实现是几年前从 Guava 分叉出来的。从那时起,该错误在 Guava 中被注意到(在本期中:https://github.com/google/guava/issues/1119),并且 Guava 版本不再表现出此行为。看起来该修复从未移植到 Spark。

© www.soinside.com 2019 - 2024. All rights reserved.