主题中没有数据时如何控制火花流的处理

问题描述 投票:0回答:1

我正在使用spark-sql 2.4.1,spark-cassandra-connector_2.11-2.4.1.jar和java8。

我有这样的卡桑德拉表:

create company(company_id int, start_date date, company_name text ,  PRIMARY_KEY(company_id ,start_date  ) 
)WITH CLUSTERING ORDER BY ( start_date DESC );

开始日期在这里是派生字段,它是在业务逻辑中计算的。

我有spark-sql流式代码,其中我在mapFunction下面调用了。

public static MapFunction<Company, CompanyTransformed>  mapFunInsertCompany = ( record ) ->{

            CompanyTransformed  rec = new CompanyTransformed();

            rec.setCompany_id(record.getCompanyId());
            rec.setCompany_name(record.getCompanyName());

            if(record.getChangeFlag().equalsIgnoreCase("I") && record.getCreateDate() != null )
                rec.setStart_date(record.getCreateDate());
            if(record.getChangeFlag().equalsIgnoreCase("U"))
                rec.setStart_date(new Date(CommonUtils.today().getTime() + 86400000));

            return rec;
    };

启动我的消费者时,kafka主题中没有记录。对于空记录流,继续调用上述map函数。

因为有record.getCreateDate()= null起始日期设置为null。

但是主键的start_date部分因此插入失败并不确定地等待,可以将数据恢复并将其保存到C *表中。

所以应该怎么做才能解决?任何线索请

apache-spark apache-spark-sql spark-streaming datastax-enterprise
1个回答
0
投票

您在Spark Streaming应用程序中面临一个常见问题。当源中没有数据时(在您的情况下为Kafka主题),Spark将创建一个emptyRDD。您可以通过添加

来验证RDD是否为空
if(!rdd.isempty)

在调用方法mapFunInsertCompany之前。

也请看看此blog post

© www.soinside.com 2019 - 2024. All rights reserved.