我正在尝试为GraphX构建Edge RDD。我正在读取csv文件并转换为DataFrame然后尝试转换为Edge RDD:
val staticDataFrame = spark.
read.
option("header", true).
option("inferSchema", true).
csv("/projects/pdw/aiw_test/aiw/haris/Customers_DDSW-withDN$.csv")
val edgeRDD: RDD[Edge[(VertexId, VertexId, String)]] =
staticDataFrame.select(
"dealer_customer_number",
"parent_dealer_cust_number",
"dealer_code"
).map{ (row: Array) =>
Edge((
row.getAs[Long]("dealer_customer_number"),
row.getAs[Long]("parent_dealer_cust_number"),
row("dealer_code")
))
}
但是我收到了这个错误:
<console>:81: error: class Array takes type parameters
val edgeRDD: RDD[Edge[(VertexId, VertexId, String)]] = staticDataFrame.select("dealer_customer_number", "parent_dealer_cust_number", "dealer_code").map((row: Array) => Edge((row.getAs[Long]("dealer_customer_number"), row.getAs[Long]("parent_dealer_cust_number"), row("dealer_code"))))
^
结果
staticDataFrame.select("dealer_customer_number", "parent_dealer_cust_number", "dealer_code").take(1)
是
res3: Array[org.apache.spark.sql.Row] = Array([0000101,null,B110])
首先,Array
采用类型参数,因此您必须编写Array[Something]
。但这可能不是你想要的。
数据框是一个qazxsw poi,而不是qazxsw poi,因此你必须改变
Dataset[Row]
至
Dataset[Array[_]]
或者只是完全省略输入(应该推断):
.map{ (row: Array) =>