我有一个我广播的python字典,其中包含用户的日期过滤器。
nested_filter = {"user1":"2018-02-15"}
b_filter = sc.broadcast(nested_filter)
我想使用此广播变量来过滤较小的RDD,其行数小于过滤日期。
rdd_set = sc.parallelize([("user1","2018-02-05"), ("user1","2018-02-20")])
rdd_set.filter(lambda fields: fields <= b_filter.value.items()).collect()
但它返回一个空的RDD。
有人可以指出我做错了什么吗?另外,我需要将字符串日期转换为日期对象吗?
正确的结果应该是:
[("user1","2018-02-05")]
观察b_filter.value.items()
调用中filter
返回的值与以下内容相同:
nested_filter.items()
#[('user1', '2018-02-15')]
那么你的比较就变成了:
("user1","2018-02-05") < [('user1', '2018-02-15')]
#False
这是False
。假设nested_filter
是一个只包含1个项目的字典(如此处所示),您可能要做的是与列表的第一个元素进行比较:
("user1","2018-02-05") < nested_filter.items()[0]
#True
因此,要“修复”您的代码,您可以执行以下操作:
rdd_set.filter(lambda fields: fields <= b_filter.value.items()[0]).collect()
#[('user1', '2018-02-05')]
但相反,我认为你真正想要的是以下内容:
rdd_set.filter(lambda fields: fields[1] <= b_filter.value.get(fields[0])).collect()
#[('user1', '2018-02-05')]
这使用fields[0]
从nested_filter
获取日期(或者如果它不存在则返回None
)并将该值与fields[1]
进行比较。
正如您所指出的,这种比较将按字典顺序在字符串上进行。如果您的日期保持YYYY-MM-DD
格式,但对于其他日期格式,您可能需要转换为datetime
对象,这对您来说不是问题。