我有一个Java微服务,它连接到Apache Spark集群,并使用Datastax Spark-Cassandra连接器将数据持久保存到apache Cassandra数据库集群。
我编写了以下方法,从Cassandra表中删除特定日期范围内的数据。
确切的代码如下所示:
public void deleteData(String fromDate, String toDate) {
SparkConf conf = sparkSession.sparkContext().getConf();
CassandraConnector connector = CassandraConnector.apply(conf);
Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
put("keyspace", CassandraProperties.KEYSPACE);
put("table", CassandraProperties.ENERGY_FORECASTS);
}}).load()
.filter(col("timestamp")
.substr(1, 10)
.between(fromDate, toDate))
.select("nodeid");
df.foreachPartition(partition -> {
Session session = connector.openSession();
while (partition.hasNext()) {
Row row = partition.next();
session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
}
session.close();
});
}
}
@Bean
public SparkSession sparkSession() {
return SparkSession
.builder()
.appName("SparkCassandraApp")
.config("spark.cassandra.connection.host", host)
.config("spark.cassandra.connection.port", port)
.config("spark.sql.caseSensitive", false)
.master(master)
.getOrCreate();
[使用本地Spark主节点运行时,代码执行正常(.master("local[*]")
选项。
但是,当我尝试在连接到远程Spark Master节点时执行相同的代码时,会发生以下错误:
Driver stacktrace:]的根本原因是java.lang.ClassCastException:无法将java.lang.invoke.SerializedLambda的实例分配给fieldorg.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 2.func $ 4在中输入org.apache.spark.api.java.function.ForeachPartitionFunctionorg.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 2的实例在java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)在java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)在java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)在org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)在org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)在org.apache.spark.scheduler.Task.run(Task.scala:123)处org.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply(Executor.scala:408)在org.apache.spark.util.Utils $ .tryWithSafeFinally(Utils.scala:1360)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:414)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)在java.lang.Thread.run(Thread.java:748)[pool-18-thread-1]信息com.datastax.spark.connector.cql.CassandraConnector-断开连接来自Cassandra群集:测试群集
我的JAVA不稳定,但是您可以尝试将lambda提取到方法中吗?
public void deleteData(String fromDate, String toDate) {
SparkConf conf = sparkSession.sparkContext().getConf();
CassandraConnector connector = CassandraConnector.apply(conf);
Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
put("keyspace", CassandraProperties.KEYSPACE);
put("table", CassandraProperties.ENERGY_FORECASTS);
}}).load()
.filter(col("timestamp")
.substr(1, 10)
.between(fromDate, toDate))
.select("nodeid");
df.foreachPartition(new ForeachPartitionFunction<Row>() {
public void call(Iterator<Row> partition) {
Session session = connector.openSession();
while (partition.hasNext()) {
Row row = partition.next();
session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
}
session.close();
}
});
}