在远程主节点中运行forEachPartition时,Apache Spark java.lang.ClassCastException

问题描述 投票:0回答:1

我有一个Java微服务,它连接到Apache Spark集群,并使用Datastax Spark-Cassandra连接器将数据持久保存到apache Cassandra数据库集群。

我编写了以下方法,从Cassandra表中删除特定日期范围内的数据。

确切的代码如下所示:

public void deleteData(String fromDate, String toDate) {


    SparkConf conf = sparkSession.sparkContext().getConf();
    CassandraConnector connector = CassandraConnector.apply(conf);

    Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
        put("keyspace", CassandraProperties.KEYSPACE);
        put("table", CassandraProperties.ENERGY_FORECASTS);
    }}).load()
            .filter(col("timestamp")
                    .substr(1, 10)
                    .between(fromDate, toDate))
            .select("nodeid");


    df.foreachPartition(partition -> {
        Session session = connector.openSession();
        while (partition.hasNext()) {
            Row row = partition.next();
            session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
        }
        session.close();
    });
}

}

    @Bean
public SparkSession sparkSession() {
    return SparkSession
            .builder()
            .appName("SparkCassandraApp")
            .config("spark.cassandra.connection.host", host)
            .config("spark.cassandra.connection.port", port)
            .config("spark.sql.caseSensitive", false)
            .master(master)
            .getOrCreate();

[使用本地Spark主节点运行时,代码执行正常(.master("local[*]")选项。

但是,当我尝试在连接到远程Spark Master节点时执行相同的代码时,会发生以下错误:

Driver stacktrace:]的根本原因是java.lang.ClassCastException:无法将java.lang.invoke.SerializedLambda的实例分配给fieldorg.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 2.func $ 4在中输入org.apache.spark.api.java.function.ForeachPartitionFunctionorg.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 2的实例在java.io.ObjectStreamClass $ FieldReflector.setObjFieldValues(ObjectStreamClass.java:2287)在java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1417)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2293)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)在java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)在java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)在java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)在java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)在java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)在org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)在org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)在org.apache.spark.scheduler.Task.run(Task.scala:123)处org.apache.spark.executor.Executor $ TaskRunner $$ anonfun $ 10.apply(Executor.scala:408)在org.apache.spark.util.Utils $ .tryWithSafeFinally(Utils.scala:1360)在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:414)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)在java.lang.Thread.run(Thread.java:748)[pool-18-thread-1]信息com.datastax.spark.connector.cql.CassandraConnector-断开连接来自Cassandra群集:测试群集

apache-spark foreach classcastexception delete-row spark-cassandra-connector
1个回答
0
投票

我的JAVA不稳定,但是您可以尝试将lambda提取到方法中吗?

public void deleteData(String fromDate, String toDate) {
    SparkConf conf = sparkSession.sparkContext().getConf();
    CassandraConnector connector = CassandraConnector.apply(conf);

    Dataset<Row> df = sparkSession.read().format("org.apache.spark.sql.cassandra").options(new HashMap<String, String>() {{
        put("keyspace", CassandraProperties.KEYSPACE);
        put("table", CassandraProperties.ENERGY_FORECASTS);
    }}).load()
        .filter(col("timestamp")
                .substr(1, 10)
                .between(fromDate, toDate))
        .select("nodeid");


    df.foreachPartition(new ForeachPartitionFunction<Row>() {
        public void call(Iterator<Row> partition) {
            Session session = connector.openSession();
            while (partition.hasNext()) {
                Row row = partition.next();
                session.execute("DELETE FROM " + CassandraProperties.KEYSPACE + "." + CassandraProperties.ENERGY_FORECASTS + " WHERE nodeid = '" + row.mkString() + "' AND timestamp >= '" + fromDate + "' AND timestamp <= '" + toDate + "'");
            }
            session.close();
        }
    });
}
© www.soinside.com 2019 - 2024. All rights reserved.