我有一个R代码,该代码在sparklyr
中进行了一些分布式数据预处理,然后将数据收集到R本地数据帧中,最后将结果保存在CSV中。一切都按预期工作,现在我计划在多个输入文件处理之间重用spark上下文。
我的代码看起来类似于此可复制的示例:
library(dplyr)
library(sparklyr)
sc <- spark_connect(master = "local")
# Generate random input
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df0.csv')
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df1.csv')
# Multi-job input
input = list(
list(name="df0", path="/tmp/input/df0.csv"),
list(name="df1", path="/tmp/input/df1.csv")
)
global_parallelism = 2
results_dir = "/tmp/results2"
# Function executed on each file
f <- function (job) {
spark_df <- spark_read_csv(sc, "df_tbl", job$path)
local_df <- spark_df %>%
group_by(V1) %>%
summarise(n=n()) %>%
sdf_collect
output_path <- paste(results_dir, "/", job$name, ".csv", sep="")
local_df %>% write.csv(output_path)
return (output_path)
}
如果我用lapply
顺序执行作业输入的功能,那么一切都会按预期进行:
> lapply(input, f)
[[1]]
[1] "/tmp/results2/df0.csv"
[[2]]
[1] "/tmp/results2/df1.csv"
但是,如果我打算并行运行它以最大程度地使用spark上下文(如果df0
spark处理已完成并且本地R正在对其进行处理,df1
可能已经被spark处理了]:
> library(parallel)
> library(MASS)
> mclapply(input, f, mc.cores = global_parallelism)
*** caught segfault ***
address 0x560b2c134003, cause 'memory not mapped'
[[1]]
[1] "Error in as.vector(x, \"list\") : \n cannot coerce type 'environment' to vector of type 'list'\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in as.vector(x, "list"): cannot coerce type 'environment' to vector of type 'list'>
[[2]]
NULL
Warning messages:
1: In mclapply(input, f, mc.cores = global_parallelism) :
scheduled core 2 did not deliver a result, all values of the job will be affected
2: In mclapply(input, f, mc.cores = global_parallelism) :
scheduled core 1 encountered error in user code, all values of the job will be affected
[当我使用Python和ThreadPoolExcutor
做类似操作时,火花上下文在线程之间共享,Scala和Java相同。
是否有可能在R中的并行执行中重用sparklyr上下文?
[是,很遗憾,类别为sc
的spark_connection
对象无法导出到另一个R进程(即使使用了分叉处理)。如果您使用的是future.apply生态系统的一部分future软件包,则可以使用以下命令查看此内容:
library(future.apply)
plan(multicore)
## Look for non-exportable objects and given an error if found
options(future.globals.onReference = "error")
y <- future_lapply(input, f)
将抛出:
Error: Detected a non-exportable reference (‘externalptr’) in one of the
globals (‘sc’ of class ‘spark_connection’) used in the future expression