我想从sparklyr
中的Spark DataFrame的每个类中采样n行。
我知道dplyr::sample_n
函数不能用于此(Is sample_n really a random sample when used with sparklyr?),所以我已经使用了sparklyr::sdf_sample()
函数。问题是我无法按组抽样,即从每个类中获取10个观察值,我只能指定要抽样的整个数据集的一部分。
我有一种解决方法,可以在循环中分别在每个组上使用sdf_sample()
,但是由于该函数未返回确切的样本大小,因此仍然不理想。
解决方法的R代码:
library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", version = "2.3")
# copy iris to our spark cluster
iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
# get class counts
class_counts <- iris_tbl %>% count(Species) %>%
collect()
# Species n
# <chr> <dbl>
#1 versicolor 50
#2 virginica 50
#3 setosa 50
# we want to sample n = 10 points from each class
n <- 10
sampled_iris <- data.frame(stringsAsFactors = F)
for( i in seq_along(class_counts$Species)){
my_frac <- n / class_counts[[i, 'n']]
my_class <- class_counts[[i, 'Species']]
tmp <- iris_tbl %>%
filter(Species == my_class) %>%
sdf_sample(fraction = my_frac) %>%
collect()
sampled_iris <- bind_rows(sampled_iris, tmp)
}
我们没有从每个班级中获得确切的10个样本:
# new counts
sampled_iris %>% count(Species)
#Species n
# <chr> <int>
#1 setosa 7
#2 versicolor 9
#3 virginica 6
我想知道是否有更好的方法使用Sparklyr在各个组之间获取均衡的样本?甚至使用sql查询,我都可以使用DBI::dbGetQuery()
直接将其传递给集群?
我无法按组抽样
只要分组列是字符串(这是sparklyr
类型映射的限制),就可以使用DataFrameStatFunctions.sampleBy
轻松地处理该部分:
spark_dataframe(iris_tbl) %>%
sparklyr::invoke("stat") %>%
sparklyr::invoke(
"sampleBy",
"Species",
fractions=as.environment(list(
"setosa"=0.2,
"versicolor"=0.2,
"virginica"=0.2
)),
seed=1L
) %>% sparklyr::sdf_register()
然而,没有任何分布式且可扩展的方法可以为您提供“准确的样本量”。可以使用以下技巧:]
iris_tbl %>%
group_by(Species) %>%
mutate(rand = rand()) %>%
arrange(rand, .by_group=TRUE) %>%
filter(row_number() <= 10) %>%
select(-rand)
但是这种方法依赖于窗口功能,对倾斜的数据分布高度敏感,并且通常无法很好地扩展。
[如果样本较小,则可以进一步进行此操作,但是先进行过度采样(使用第一种方法),然后再进行精确的样本(使用第二种方法),但是如果您的数据足够大,可以用Spark处理,则波动很小其实不重要。