如何在Kafka主题中没有数据的情况下控制火花流的处理

问题描述 投票:0回答:1

我正在使用spark-sql 2.4.1,spark-cassandra-connector_2.11-2.4.1.jar和java8。

我有这样的卡桑德拉表:

CREATE company(company_id int, start_date date, company_name text, PRIMARY_KEY (company_id, start_date))
WITH CLUSTERING ORDER BY (start_date DESC);

此处的字段start_date是一个派生字段,它是在业务逻辑中计算的。

我具有在mapFunction下面调用的spark-sql流式代码。

public static MapFunction<Company, CompanyTransformed>  mapFunInsertCompany = ( record ) ->{

  CompanyTransformed  rec = new CompanyTransformed();

  rec.setCompany_id(record.getCompanyId());
  rec.setCompany_name(record.getCompanyName());

  if(record.getChangeFlag().equalsIgnoreCase("I") && record.getCreateDate() != null )
    rec.setStart_date(record.getCreateDate());
  if(record.getChangeFlag().equalsIgnoreCase("U"))
    rec.setStart_date(new Date(CommonUtils.today().getTime() + 86400000));

  return rec;
};

启动我的使用者并且在kafka主题中没有记录时,流媒体流连续调用以上map函数。

因为record.getCreateDate()= null start_date设置为null。

但是start_date是我的C *表中主键的一部分,因此,插入失败和无限期地等待火花,无法恢复并将数据保存到C *表中。

所以1.应该怎么做才能解决?有任何线索吗?

第2部分:

  1. 如何从故障中恢复?

最新记录.writeStream().foreachBatch(((batchDf,batchId)-> {批处理。写().format(“ org.apache.spark.sql.cassandra”).option(“表格”,“公司”).option(“ keyspace”,“ ks_1”).mode(SaveMode.Append)。救();})。start().. awaitTermination();

我正在使用上述Java API,我没有找到用于检查Java中的“ isEmpty” rdd的等效方法。

任何线索如何在Java中处理?

第3部分:

尝试过此

.foreachBatch((batchDf, batchId) -> {
    System.out.println( "latestRecords batchDf.isEmpty : " + 
     batchDf.isEmpty() + "\t length : " + batchDf.rdd().getPartitions().length);
 }

提供输出为

latestRecords batchDf.isEmpty : false    length : 6

那么如何检查isEmpty? as isEmpty:false

apache-spark apache-spark-sql spark-streaming datastax-enterprise
1个回答
1
投票

您在Spark Streaming应用程序中面临一个常见问题。当源中没有数据时(在您的情况下为Kafka主题),Spark将创建一个emptyRDD。您可以通过添加

来验证RDD是否为空
if(!rdd.isEmpty)

在调用方法mapFunInsertCompany之前。

也请查看此blog post

© www.soinside.com 2019 - 2024. All rights reserved.