我正在使用spark-sql 2.4.1,spark-cassandra-connector_2.11-2.4.1.jar和java8。
我有这样的卡桑德拉表:
CREATE company(company_id int, start_date date, company_name text, PRIMARY_KEY (company_id, start_date))
WITH CLUSTERING ORDER BY (start_date DESC);
此处的字段start_date是一个派生字段,它是在业务逻辑中计算的。
我具有在mapFunction下面调用的spark-sql流式代码。
public static MapFunction<Company, CompanyTransformed> mapFunInsertCompany = ( record ) ->{
CompanyTransformed rec = new CompanyTransformed();
rec.setCompany_id(record.getCompanyId());
rec.setCompany_name(record.getCompanyName());
if(record.getChangeFlag().equalsIgnoreCase("I") && record.getCreateDate() != null )
rec.setStart_date(record.getCreateDate());
if(record.getChangeFlag().equalsIgnoreCase("U"))
rec.setStart_date(new Date(CommonUtils.today().getTime() + 86400000));
return rec;
};
启动我的使用者并且在kafka主题中没有记录时,流媒体流连续调用以上map函数。
因为record.getCreateDate()= null start_date设置为null。
但是start_date是我的C *表中主键的一部分,因此,插入失败和无限期地等待火花,无法恢复并将数据保存到C *表中。
所以1.应该怎么做才能解决?有任何线索吗?
第2部分:
最新记录.writeStream().foreachBatch(((batchDf,batchId)-> {批处理。写().format(“ org.apache.spark.sql.cassandra”).option(“表格”,“公司”).option(“ keyspace”,“ ks_1”).mode(SaveMode.Append)。救();})。start().. awaitTermination();
我正在使用上述Java API,我没有找到用于检查Java中的“ isEmpty” rdd的等效方法。
任何线索如何在Java中处理?
第3部分:
尝试过此
.foreachBatch((batchDf, batchId) -> {
System.out.println( "latestRecords batchDf.isEmpty : " +
batchDf.isEmpty() + "\t length : " + batchDf.rdd().getPartitions().length);
}
提供输出为
latestRecords batchDf.isEmpty : false length : 6
那么如何检查isEmpty? as isEmpty:false