我有一个dataframe
,其中有几行。我可以使用以下代码循环浏览此dataframe
:
for row in df.rdd.collect():
但是这不能同时使用吗?因此,我要映射每行并将其传递给UDF,然后根据行中的值返回另一个新的数据帧(来自数据库)。
我尝试过df.rdd.map(lambda row:read_from_mongo(row,spark))。toDF()
但是我遇到了这个错误:
_ pickle.PicklingError:无法序列化对象:异常:您似乎正在尝试从广播变量,动作或变换。 SparkContext只能在驱动程序上使用,而不是在工作程序上运行的代码中使用。欲了解更多信息,请参阅SPARK-5063。
如何并行循环dataframe
并保持每行返回的dataframe
?
创建的每个Spark RDD或DataFrame都与应用程序的SparkContext相关联,并且SparkContext只能在驱动程序代码中引用。您的返回数据框架的UDF尝试从工作程序而不是从驱动程序引用SparkContext。那么,为什么需要为每一行创建一个单独的DataFrame?如果-您希望以后将生成的DataFrame合并为一个。-第一个DataFrame足够小。然后,您可以简单地收集DataFrame的内容并将其用作过滤器以从Mongodb返回行。在这里,为了实现并行性,您需要依赖用于连接到Mongodb的连接器。