如何在火花流应用程序中使用滞后/超前功能?

问题描述 投票:0回答:1

我正在使用spark-sql 2.4.x版本,对于Cassandra-3.x版本使用datastax-spark-cassandra-connector。连同卡夫卡。

我有一个来自kafka主题的财务数据场景。像companyId,year,Quarter,sales,prev_sales数据。

val kafkaDf = sc.parallelize(Seq((15,2016, 4, 100.5,"")).toDF("companyId", "year","quarter", "sales","prev_sales")

我需要使用cassandra表中的上一年同季度数据进行prev_sales,如下图所示>>

val cassandraTabledf = sc.parallelize(Seq(
  (15,2016, 3, 120.6, 320.6),
  (15,2016, 2, 450.2,650.2),
  (15,2016, 1, 200.7,700.7),
  (15,2015, 4, 221.4,400),
  (15,2015, 3, 320.6,300),
  (15,2015, 2, 650.2,200),
  (15,2015, 1, 700.7,100))).toDF("companyId", "year","quarter", "sales","prev_sales")

即对于Seq((15,2016,4,100.5,“”)数据应为2015年第4季度数据,即221.4

所以新数据是

((15,2016,4,100.5,221.4)] >>

如何做到/实现这一目标?我们可以显式地进行查询,但是有什么方法可以在cassandra表上使用join来使用“滞后”函数?

我正在使用spark-sql 2.4.x版本,对于Cassandra-3.x版本使用datastax-spark-cassandra-connector。随着卡夫卡。我有一个来自kafka主题的财务数据场景。如companyId,...

apache-spark cassandra apache-spark-sql
1个回答
0
投票

我认为它不需要任何leglead功能。您也可以通过join获得所需的输出。检查以下代码以供参考:

注意:我在kafkaDF中添加了更多数据,以提高理解。

© www.soinside.com 2019 - 2024. All rights reserved.