如何使用SparkR :: read.jdbc()或sparklyr :: spark_read_jdbc()来获取SQL查询而不是整个表的结果?

问题描述 投票:0回答:1

我通常在本地使用RODBC查询我的数据库。但是,我们公司最近已迁移到Azure Databricks,它本身不支持RODBC或其他odbc连接,但支持我以前未使用过的jdbc连接。

我已经阅读了SparkR :: read.jdbc()和sparklyr :: spark_read_jdbc()的文档,但是这些似乎从数据库中提取了整个表,而不仅仅是查询的结果,这不适合我我不必拉整个表,而不必运行将多个表连接在一起的查询,而只返回每个表中很小的数据子集。

我找不到将jdbc连接器用于以下方法:

((A)运行引用同一数据库上多个表的查询

((B)将结果存储为R数据帧或可以很容易转换为R数据帧的内容(例如SparkR或sparklyr数据帧)。

[如果可能,该解决方案还只需要我为每个脚本/笔记本指定一次连接凭据,而不是每次我连接至数据库以运行查询并将结果存储为数据框时指定。

例如是否有与以下命令等效的jdbc:

my_server="myserver.database.windows.net"
my_db="mydatabase"
my_username="database_user"
my_pwd="abc123Ineedabetterpassword"


myconnection <- RODBC::odbcDriverConnect(paste0("DRIVER={SQL Server};
                                 server=",my_server,";
                                 database=",my_db,";
                                 uid=",my_username,";
                                 pwd=",my_pwd))

df <- RODBC::sqlQuery(myconnection, 
"SELECT a.var1, b.var2, SUM(c.var3) AS Total_Things, AVG(d.var4) AS Mean_Stuff
FROM table_A as a 
JOIN table_B as b on a.id = b.a_id
JOIN table_C as c on a.id = c.a_id
JOIN table_D as d on c.id = d.c_id
Where a.filter_var IN (1, 2, 3, 4)
AND d.filter_var LIKE '%potatoes%'
GROUP BY
a.var1, b.var2
")

df2 <- RODBC::sqlQuery(myconnection,
"SELECT x.var1, y.var2, z.var3
FROM table_x as x
LEFT JOIN table_y as y on x.id = y.x_id
LEFT JOIN table_z on as z on x.id = z.x_id
WHERE z.category like '%vegetable%'
AND y.category IN ('A', 'B', 'C')
“)

我如何使用SparkR或内置在Databricks中的sparklyr的jdbc连接器执行与上述相同的结果(两个R数据帧df和df2)?

[我知道我可以使用spark连接器和一些scala代码(https://docs.microsoft.com/en-us/azure/sql-database/sql-database-spark-connector)将查询结果存储为spark数据帧,将其转换为全局临时表,将全局临时表存储为SparkR数据帧并将其折叠到R数据帧,但是此代码很难阅读,需要我将笔记本中的一个单元格的语言更改为scala(我不太了解),并且由于数量众多,这需要很长时间步骤。因为我的R脚本通常以几个SQL查询开始-通常是对多个不同的数据库进行查询-这种方法非常耗时,并且使我的脚本几乎不可读。当然有更直接的方法吗?

((我们主要将Databricks用于通过LogicApps和Azure Data Factory进行自动化,偶尔用于增加RAM,而不是用于并行处理;我们的数据(一旦提取)通常不够大,无法并行化,我们使用的某些模型(例如lme4 :: lmer()不能从中受益。)

sql r jdbc sparkr sparklyr
1个回答
0
投票

我最终解决了这个问题,希望在其他人遇到问题时在此处发布答案。

您可以在查询中使用SparkR :: read.jdbc(),但必须将查询括在方括号中,并以某种形式对结果进行别名,否则将导致语法错误。然后,您可以在包含查询结果的SparkDataFrame上调用SparkR :: collect(),将其转换为R数据帧:

例如

myconnection <- "jdbc:sqlserver://myserver.database.windows.net:1433;database=mydatabase;user=database_user;password=abc123Ineedabetterpassword"

df <- read.jdbc( myconnection, "(
SELECT a.var1, b.var2, SUM(c.var3) AS Total_Things, AVG(d.var4) AS Mean_Stuff
FROM table_A as a 
JOIN table_B as b on a.id = b.a_id
JOIN table_C as c on a.id = c.a_id
JOIN table_D as d on c.id = d.c_id
Where a.filter_var IN (1, 2, 3, 4)
AND d.filter_var LIKE '%potatoes%'
GROUP BY
a.var1, b.var2)) as result" ) %>% 
SparkR::collect()
© www.soinside.com 2019 - 2024. All rights reserved.