我正在使用PySpark处理一个庞大的数据集,我希望根据另一个数据框中的字符串过滤数据帧。例如,
dd = spark.createDataFrame(["something.google.com","something.google.com.somethingelse.ac.uk","something.good.com.cy", "something.good.com.cy.mal.org"], StringType()).toDF('domains')
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com |
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy |
|something.good.com.cy.mal.org |
+----------------------------------------+
dd1 = spark.createDataFrame(["google.com", "good.com.cy"], StringType()).toDF('gooddomains')
+-----------+
|gooddomains|
+-----------+
|google.com |
|good.com.cy|
+-----------+
我假设domains
和gooddomains
是有效的域名。
我想要做的是过滤掉dd
中不以dd1
结尾的匹配字符串。所以在上面的例子中,我想过滤掉第1行和第3行,最后得到
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
+----------------------------------------+
我目前的解决方案(如下所示)只能考虑最多3个字的域名。如果我在verygood.co.ac.uk
(即白名单)中添加说法,dd1
,那么它将会失败。
def split_filter(x, whitelist):
splitted1 = x.select(F.split(x['domains'], '\.').alias('splitted_domains'))
last_two = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_two'))
last_three = splitted1.select(F.concat(splitted1.splitted_domains[F.size(splitted1.splitted_domains)-3], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-2], \
F.lit('.'), \
splitted1.splitted_domains[F.size(splitted1.splitted_domains)-1]).alias('last_three'))
x = x.withColumn('id', F.monotonically_increasing_id())
last_two = last_two.withColumn('id', F.monotonically_increasing_id())
last_three = last_three.withColumn('id', F.monotonically_increasing_id())
final_d = x.join(last_two, ['id']).join(last_three, ['id'])
df1 = final_d.join(whitelist, final_d['last_two'] == whitelist['domains'], how = 'left_anti')
df2 = df1.join(whitelist, df1['last_three'] == whitelist['domains'], how = 'left_anti')
return df2.drop('id')
我使用Spark 2.3.0和Python 2.7.5。
让我们扩展domains
以获得更好的覆盖范围:
domains = spark.createDataFrame([
"something.google.com", # OK
"something.google.com.somethingelse.ac.uk", # NOT OK
"something.good.com.cy", # OK
"something.good.com.cy.mal.org", # NOT OK
"something.bad.com.cy", # NOT OK
"omgalsogood.com.cy", # NOT OK
"good.com.cy", # OK
"sogood.example.com", # OK Match for shorter redundant, mismatch on longer
"notsoreal.googleecom" # NOT OK
], "string").toDF('domains')
good_domains = spark.createDataFrame([
"google.com", "good.com.cy", "alsogood.com.cy",
"good.example.com", "example.com" # Redundant case
], "string").toDF('gooddomains')
现在......只使用Spark SQL原语的简单解决方案是简化当前的方法。既然您已经声明可以安全地假设这些是有效的公共域,我们可以定义这样的函数:
from pyspark.sql.functions import col, regexp_extract
def suffix(c):
return regexp_extract(c, "([^.]+\\.[^.]+$)", 1)
提取顶级域和第一级子域:
domains_with_suffix = (domains
.withColumn("suffix", suffix("domains"))
.alias("domains"))
good_domains_with_suffix = (good_domains
.withColumn("suffix", suffix("gooddomains"))
.alias("good_domains"))
domains_with_suffix.show()
+--------------------+--------------------+
| domains| suffix|
+--------------------+--------------------+
|something.google.com| google.com|
|something.google....| ac.uk|
|something.good.co...| com.cy|
|something.good.co...| mal.org|
|something.bad.com.cy| com.cy|
| omgalsogood.com.cy| com.cy|
| good.com.cy| com.cy|
| sogood.example.com| example.com|
|notsoreal.googleecom|notsoreal.googleecom|
+--------------------+--------------------+
现在我们可以外连接:
from pyspark.sql.functions import (
col, concat, lit, monotonically_increasing_id, sum as sum_
)
candidates = (domains_with_suffix
.join(
good_domains_with_suffix,
col("domains.suffix") == col("good_domains.suffix"),
"left"))
并过滤结果:
is_good_expr = (
col("good_domains.suffix").isNotNull() & # Match on suffix
(
# Exact match
(col("domains") == col("gooddomains")) |
# Subdomain match
col("domains").endswith(concat(lit("."), col("gooddomains")))
)
)
not_good_domains = (candidates
.groupBy("domains") # .groupBy("suffix", "domains") - see the discussion
.agg((sum_(is_good_expr.cast("integer")) > 0).alias("any_good"))
.filter(~col("any_good"))
.drop("any_good"))
not_good_domains.show(truncate=False)
+----------------------------------------+
|domains |
+----------------------------------------+
|omgalsogood.com.cy |
|notsoreal.googleecom |
|something.good.com.cy.mal.org |
|something.google.com.somethingelse.ac.uk|
|something.bad.com.cy |
+----------------------------------------+
这比Cartesian product required for direct join with LIKE
好,但是对蛮力不满意,在最坏的情况下需要两次洗牌 - 一次用于join
(如果good_domains
足够小到broadcasted
,这可以跳过),另一次用于group_by
+ agg
。
不幸的是,Spark SQL不允许自定义分区器只使用一次shuffle(但是在RDD API中可能使用composite key)并且优化器还不够智能,以优化join(_, "key1")
和.groupBy("key1", _)
。
如果你可以接受一些假阴性,你可以去概率。首先让我们建立概率计数器(这里使用bounter
,在toolz
的帮助下)
from pyspark.sql.functions import concat_ws, reverse, split
from bounter import bounter
from toolz.curried import identity, partition_all
# This is only for testing on toy examples, in practice use more realistic value
size_mb = 20
chunk_size = 100
def reverse_domain(c):
return concat_ws(".", reverse(split(c, "\\.")))
def merge(acc, xs):
acc.update(xs)
return acc
counter = sc.broadcast((good_domains
.select(reverse_domain("gooddomains"))
.rdd.flatMap(identity)
# Chunk data into groups so we reduce the number of update calls
.mapPartitions(partition_all(chunk_size))
# Use tree aggregate to reduce pressure on the driver,
# when number of partitions is large*
# You can use depth parameter for further tuning
.treeAggregate(bounter(need_iteration=False, size_mb=size_mb), merge, merge)))
接下来定义这样的用户定义函数函数
from pyspark.sql.functions import pandas_udf, PandasUDFType
from toolz import accumulate
def is_good_counter(counter):
def is_good_(x):
return any(
x in counter.value
for x in accumulate(lambda x, y: "{}.{}".format(x, y), x.split("."))
)
@pandas_udf("boolean", PandasUDFType.SCALAR)
def _(xs):
return xs.apply(is_good_)
return _
并过滤domains
:
domains.filter(
~is_good_counter(counter)(reverse_domain("domains"))
).show(truncate=False)
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
|something.bad.com.cy |
|omgalsogood.com.cy |
|notsoreal.googleecom |
+----------------------------------------+
在Scala中,可以使用bloomFilter
完成
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
import org.apache.spark.util.sketch.BloomFilter
def reverseDomain(c: Column) = concat_ws(".", reverse(split(c, "\\.")))
val checker = good_domains.stat.bloomFilter(
// Adjust values depending on the data
reverseDomain($"gooddomains"), 1000, 0.001
)
def isGood(checker: BloomFilter) = udf((s: String) =>
s.split('.').toStream.scanLeft("") {
case ("", x) => x
case (acc, x) => s"${acc}.${x}"
}.tail.exists(checker mightContain _))
domains.filter(!isGood(checker)(reverseDomain($"domains"))).show(false)
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
|something.bad.com.cy |
|omgalsogood.com.cy |
|notsoreal.googleecom |
+----------------------------------------+
如果需要,shouldn't be hard to call such code from Python。
由于近似性质,这可能仍然不完全令人满意。如果您需要精确的结果,可以尝试利用数据的冗余性质,例如使用trie(此处使用datrie
实现)。
如果good_domains
相对较小,您可以使用与概率变量类似的方式创建单个模型:
import string
import datrie
def seq_op(acc, x):
acc[x] = True
return acc
def comb_op(acc1, acc2):
acc1.update(acc2)
return acc1
trie = sc.broadcast((good_domains
.select(reverse_domain("gooddomains"))
.rdd.flatMap(identity)
# string.printable is a bit excessive if you need standard domain
# and not enough if you allow internationalized domain names.
# In the latter case you'll have to adjust the `alphabet`
# or use different implementation of trie.
.treeAggregate(datrie.Trie(string.printable), seq_op, comb_op)))
定义用户定义的函数:
def is_good_trie(trie):
def is_good_(x):
if not x:
return False
else:
return any(
x == match or x[len(match)] == "."
for match in trie.value.iter_prefixes(x)
)
@pandas_udf("boolean", PandasUDFType.SCALAR)
def _(xs):
return xs.apply(is_good_)
return _
并将其应用于数据:
domains.filter(
~is_good_trie(trie)(reverse_domain("domains"))
).show(truncate=False)
+----------------------------------------+
|domains |
+----------------------------------------+
|something.google.com.somethingelse.ac.uk|
|something.good.com.cy.mal.org |
|something.bad.com.cy |
|omgalsogood.com.cy |
|notsoreal.googleecom |
+----------------------------------------+
这种特定的方法在假设所有good_domains
可以压缩成单个trie的情况下工作,但是可以很容易地扩展以处理不满足该假设的情况。例如,您可以为每个顶级域或后缀构建单个trie(如天真解决方案中所定义)
(good_domains
.select(suffix("gooddomains"), reverse_domain("gooddomains"))
.rdd
.aggregateByKey(datrie.Trie(string.printable), seq_op, comb_op))
然后,根据需要从序列化版本加载模型,或使用RDD
操作。
这两种非本机方法可以根据数据,业务需求(如近似解决方案中的假负容差)和可用资源(驱动程序内存,执行程序内存,suffixes
的基数,访问分布式POSIX兼容的分布式文件)进一步调整系统等)。在DataFrames
和RDDs
(内存使用,通信和序列化开销)上应用这些时,还需要考虑一些权衡因素。
如果我理解正确,您只需要使用简单的SQL字符串匹配模式进行左反连接。
from pyspark.sql.functions import expr
dd.alias("l")\
.join(
dd1.alias("r"),
on=expr("l.domains LIKE concat('%', r.gooddomains)"),
how="leftanti"
)\
.select("l.*")\
.show(truncate=False)
#+----------------------------------------+
#|domains |
#+----------------------------------------+
#|something.google.com.somethingelse.ac.uk|
#|something.good.com.cy.mal.org |
#+----------------------------------------+
表达式concat('%', r.gooddomains)
在r.gooddomains
之前加上一个通配符。
接下来,我们使用l.domains LIKE concat('%', r.gooddomains)
来查找与此模式匹配的行。
最后,指定how="leftanti"
以便仅保留不匹配的行。
更新:正如the comments在@user10938362中所指出的,这种方法存在两个缺陷:
1)由于这仅查看匹配的后缀,因此存在产生错误结果的边缘情况。例如:
example.com
应该匹配example.com
和subdomain.example.com
,但不匹配fakeexample.com
有两种方法可以解决这个问题。第一个是修改LIKE
表达式来处理这个问题。由于我们知道这些都是有效域,因此我们可以检查完全匹配或域后跟一个点:
like_expr = " OR ".join(
[
"(l.domains = r.gooddomains)",
"(l.domains LIKE concat('%.', r.gooddomains))"
]
)
dd.alias("l")\
.join(
dd1.alias("r"),
on=expr(like_expr),
how="leftanti"
)\
.select("l.*")\
.show(truncate=False)
类似地,可以使用带有后视的正则表达式模式的RLIKE
。
2)更大的问题是,正如here所解释的那样,加入LIKE
表达式会产生笛卡尔积。如果dd1
小到可以广播,那么这不是问题。
否则,您可能会遇到性能问题,并且必须尝试不同的方法。
更多关于LIKE
的PySparkSQL Apache HIVE docs运算符:
A LIKE B
:
如果字符串A与SQL简单正则表达式B匹配,则为TRUE,否则为FALSE。比较是逐个字符地完成的。 B中的
_
字符匹配A中的任何字符(类似于posix正则表达式中的.
),B中的%
字符匹配A中的任意数量的字符(类似于posix正则表达式中的.*
)。例如,'foobar' LIKE 'foo'
评估为FALSE,其中'foobar' LIKE 'foo___'
评估为TRUE,'foobar' LIKE 'foo%'
也是如此。要逃离%
使用\
(%
匹配一个%
角色)。如果数据包含分号,并且您想要搜索它,则需要对其进行转义,columnValue LIKE 'a\;b'
注意:这利用了使用pyspark.sql.functions.expr
到pass in a column value as a parameter to a function的“技巧”。