我正在开发一个动态脚本,可以join
任何给定的pyspark数据帧。问题是文件中的列名称会有所不同,连接条件的数量可能会有所不同。我可以在循环中处理这个,但是我用变量名执行连接失败了。
(我的目的是根据文件结构和连接条件动态填充a和b或更多列)
b="incrementalFile.Id1"
a="existingFile.Id"
unChangedRecords = existingFile.join(incrementalFile,(a==b),"left")
回溯(最近一次调用最后一次):文件“”,第1行,在文件“/usr/lib/spark/python/pyspark/sql/dataframe.py”,第818行,在join assert isinstance中(在[0],列上) ),“on应该是列或列的列”AssertionError:on应该是Column或Column列表
但是如果我没有在join
条件中放置任何变量,相同的代码工作正常,如下所示。
unChangedRecords = existingFile.join(
incrementalFile,
(existingFile.Id==incrementalFile.Id1),
"left")
在你的第二个例子中,existingFile.Id
是一个列,而不是一个字符串,但在你的第一个例子中,它是一个字符串。您想使用pyspark.sql.functions.col按名称引用列。它的文档没有示例,但它在同一页面上的alias示例中使用。