我在Azure数据块中有一个很大的数据集作为Spark数据框,并使用R代码来分析数据。我正在将在本地桌面RStudio中工作的R代码转换为Databricks R代码。我正在尝试基于n_distinct(column)> 2过滤一个较大的spark数据帧,以进行进一步的分析。
我已经尝试在Azure数据块的RStudio中使用可工作的本地桌面Rstidio代码。
需要帮助将“ filter(n_distinct(carb)> 2)”转换为火花代码
用于数据块中的Rstudio或R笔记本。
## working desktop R code
library(dplyr)
set.seed(10)
df <- data.frame(mtcars)
## filter the dataset to have only those "cyl" which have number of "carb" more than 2
df.dt1<- df %>% group_by(cyl) %>% filter( n_distinct(carb)>2)
df.dt1
## Databricks - RStudio code
set.seed(10)
## use the mtcars dataset
df <- data.frame(mtcars)
## copying to Spark
df.spark <- copy_to(sc, df, "df_spark", overwrite = TRUE)
## filter the dataset to have only those "cyl" which have number of "carb" more than 2
df.dt1<- df.spark %>% group_by(cyl) %>% filter(dplyr::n_distinct(carb)>2) %>% collect()
错误:此数据库不支持窗口功能distinct()
预期输出如下
cyl disp hp drat wt qsec vs am gear carb
<dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
6 160 110 3.9 2.62 16.46 0 1 4 4
6 160 110 3.9 2.875 17.02 0 1 4 4
6 258 110 3.08 3.215 19.44 1 0 3 1
8 360 175 3.15 3.44 17.02 0 0 3 2
6 225 105 2.76 3.46 20.22 1 0 3 1
8 360 245 3.21 3.57 15.84 0 0 3 4
6 167.6 123 3.92 3.44 18.3 1 0 4 4
6 167.6 123 3.92 3.44 18.9 1 0 4 4
8 275.8 180 3.07 4.07 17.4 0 0 3 3
8 275.8 180 3.07 3.73 17.6 0 0 3 3
结果数据集将仅具有来自“ cyl” 6和8的记录,它们分别具有唯一的“ carb” 3和4的计数,而省略了cyl 4的记录,因为它具有carb 2的唯一计数。
## actual working code from my dataset in RStudio in Databricks
multi_contract <- Cust_details %>%
group_by(CustomerID) %>%
## filter records for customers having more than one contract
filter(n_distinct(ContractType)>1)
此代码的问题在于,处理100万条记录大约需要1个小时,而结果数据集只有41k条记录。因此,必须在sparklyr或sparkR中有更好的方法来进行此操作。
这是一种用于计算不使用distinct
的组B中某个值A的不同观察值的方法:
df %>%
distinct(A, B) %>%
group_by(B) %>%
summarise(distinct_A = n())
您可以将结果与分组列B
上的原始df内部合并,以获得所需的结果。就您的示例而言,
sc <- spark_connect(master = "local")
mtcars_spark <- sdf_copy_to(sc, mtcars, "mtcars_spark")
keep_cyl <- mtcars_spark %>%
distinct(cyl, carb) %>%
group_by(cyl) %>%
summarise(distinct_carb_count = n()) %>%
filter(distinct_carb_count > 2)
inner_join(keep_cyl, mtcars_spark)