错误显示:
原因:java.lang.NullPointerException:请求的TypeTagGettableToMappedTypeConverter由于以下原因而无法反序列化TypeTag:Scala 2.10 TypeTag限制。它们以空值形式返回,因此您会看到此NPE。
gradle.build
dependencies {
implementation group: 'org.scala-lang', name: 'scala-library', version: '2.12.11'
implementation group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.5'
implementation group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.5'
implementation group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.12', version: '2.5.0'
implementation group: 'org.apache.spark', name: 'spark-mllib_2.12', version: '2.4.5'
implementation group: 'log4j', name: 'log4j', version: '1.2.17'
implementation group: 'org.scalaj', name: 'scalaj-http_2.12', version: '2.4.2'
}
Scala对象
object SparkModule {
case class UDTCaseClass(a: Int = 0, b: Float = 0f, c: Int = 0, d: Int = 0)
case class TableCaseClass(id: UUID, col1: Boolean, list: List[UDTCaseClass])
val spark = SparkSession.builder
.master("local[2]")
.appName("App")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.config("spark.executor.cores", "1")
.getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
val cassandraRDD = sc.cassandraTable[TableCaseClass](
"keyspace", "table"
).limit(20)
println(cassandraRDD.count())
}
起初,有时会显示错误,有时却没有显示,直到我将其缩小并意识到它显示了何时UDT的任何字段为null
,否则它就可以正常工作。例如,如果表包含以下任一行,则会引发错误:
f39b5201-1e96-44a8-946c-d959c217f174 |错误| [{a:123,b:2.3,c:33,d:null}]f39b5201-1e96-44a8-946c-d959c217f174 |错误| [{a:123,b:2.3,c:null,d:34}]f39b5201-1e96-44a8-946c-d959c217f174 |错误| [{a:123,b:null,c:33,d:12}]f39b5201-1e96-44a8-946c-d959c217f174 |错误| [{a:null,b:2.3,c:33,d:22}]
例如,这个:
f39b5201-1e96-44a8-946c-d959c217f174 |错误| null
cassandraTable
可以正常读取。
我尝试使用Option
这样:case class UDTCaseClass(a: Option[Int] = None, b: Option[Float] = None, c: Option[Int] = None, d: Option[Int] = None)
,但出现相同的错误。
我总是可以只插入0而不是null
,但是可以避免吗?
谢谢
使用Spark 2.4.2 / Scala 2.12和SCC 2.5.0正常工作。
对于以下UDT /表和数据:
CREATE TYPE test.udt (
id int,
t1 int,
t2 int,
a2 int
);
CREATE TABLE test.u3 (
id int PRIMARY KEY,
u list<frozen<udt>>
);
insert into test.u3(id, u) values (5, [{id: 1, t1: 3}]);
以下Scala代码可以正常工作:
case class UDT(id: Int, t1: Int, t2: Option[Int], a2: Option[Int])
case class U3(id: Int, u: List[UDT])
import com.datastax.spark.connector._
val d = sc.cassandraTable[U3]("test", "u3")
d.collect
它按预期返回:Array(U3(5,List(UDT(1,3,None,None))))
。
您的错误可能是由于您可能未重新编译代码或以某种方式将其缓存的问题引起的。
P.S。正如我在评论中指出的那样,如果您只是开始,则更喜欢使用Dataframe API-它是fully supported by SCC。