我正在使用spark-sql 2.4.x版本,对于Cassandra-3.x版本使用datastax-spark-cassandra-connector。连同卡夫卡。
我有一个来自kafka主题的财务数据场景。像companyId,year,Quarter,sales,prev_sales数据。
val kafkaDf = sc.parallelize(Seq((15,2016, 4, 100.5,"")).toDF("companyId", "year","quarter", "sales","prev_sales")
我需要使用cassandra表中的上一年同季度数据进行prev_sales,如下图所示>>
val cassandraTabledf = sc.parallelize(Seq( (15,2016, 3, 120.6, 320.6), (15,2016, 2, 450.2,650.2), (15,2016, 1, 200.7,700.7), (15,2015, 4, 221.4,400), (15,2015, 3, 320.6,300), (15,2015, 2, 650.2,200), (15,2015, 1, 700.7,100))).toDF("companyId", "year","quarter", "sales","prev_sales")
即对于Seq((15,2016,4,100.5,“”)数据应为2015年第4季度数据,即221.4
所以新数据是
((15,2016,4,100.5,221.4)] >>
如何做到/实现这一目标?我们可以显式地进行查询,但是有什么方法可以在cassandra表上使用join来使用“滞后”函数?
我正在使用spark-sql 2.4.x版本,对于Cassandra-3.x版本使用datastax-spark-cassandra-connector。随着卡夫卡。我有一个来自kafka主题的财务数据场景。如companyId,...
我认为它不需要任何leg
和lead
功能。您也可以通过join
获得所需的输出。检查以下代码以供参考:
注意:我在kafkaDF
中添加了更多数据,以提高理解。