Pyspark - RDD过滤器与广播词典中的日期

问题描述 投票:0回答:1

我有一个我广播的python字典,其中包含用户的日期过滤器。

nested_filter = {"user1":"2018-02-15"}
b_filter = sc.broadcast(nested_filter)

我想使用此广播变量来过滤较小的RDD,其行数小于过滤日期。

rdd_set = sc.parallelize([("user1","2018-02-05"), ("user1","2018-02-20")])

rdd_set.filter(lambda fields: fields <= b_filter.value.items()).collect()

但它返回一个空的RDD。

有人可以指出我做错了什么吗?另外,我需要将字符串日期转换为日期对象吗?

正确的结果应该是:

[("user1","2018-02-05")]
python apache-spark pyspark rdd
1个回答
1
投票

观察b_filter.value.items()调用中filter返回的值与以下内容相同:

nested_filter.items()
#[('user1', '2018-02-15')]

那么你的比较就变成了:

("user1","2018-02-05") < [('user1', '2018-02-15')]
#False

这是False。假设nested_filter是一个只包含1个项目的字典(如此处所示),您可能要做的是与列表的第一个元素进行比较:

("user1","2018-02-05") < nested_filter.items()[0]
#True

因此,要“修复”您的代码,您可以执行以下操作:

rdd_set.filter(lambda fields: fields <= b_filter.value.items()[0]).collect()
#[('user1', '2018-02-05')]

但相反,我认为你真正想要的是以下内容:

rdd_set.filter(lambda fields: fields[1] <= b_filter.value.get(fields[0])).collect()
#[('user1', '2018-02-05')]

这使用fields[0]nested_filter获取日期(或者如果它不存在则返回None)并将该值与fields[1]进行比较。

正如您所指出的,这种比较将按字典顺序在字符串上进行。如果您的日期保持YYYY-MM-DD格式,但对于其他日期格式,您可能需要转换为datetime对象,这对您来说不是问题。

© www.soinside.com 2019 - 2024. All rights reserved.