如何在mclapply中重用sparklyr上下文 ?

问题描述 投票:0回答:1

我有一个R代码,该代码在sparklyr中进行了一些分布式数据预处理,然后将数据收集到R本地数据帧中,最后将结果保存在CSV中。一切都按预期工作,现在我计划在多个输入文件处理之间重用spark上下文。

我的代码看起来类似于此可复制的示例:

library(dplyr)
library(sparklyr)

sc <- spark_connect(master = "local")

# Generate random input
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df0.csv')
matrix(rbinom(1000, 1, .5), ncol=1) %>% write.csv('/tmp/input/df1.csv')

# Multi-job input
input = list(
    list(name="df0", path="/tmp/input/df0.csv"),
    list(name="df1", path="/tmp/input/df1.csv")
)
global_parallelism = 2
results_dir = "/tmp/results2"

# Function executed on each file
f <- function (job) {
    spark_df <- spark_read_csv(sc, "df_tbl", job$path)
    local_df <- spark_df %>% 
      group_by(V1) %>%           
      summarise(n=n()) %>%
      sdf_collect

    output_path <- paste(results_dir, "/", job$name, ".csv", sep="")
    local_df %>% write.csv(output_path)
    return (output_path)
}

如果我用lapply顺序执行作业输入的功能,那么一切都会按预期进行:

> lapply(input, f)

[[1]]
[1] "/tmp/results2/df0.csv"

[[2]]
[1] "/tmp/results2/df1.csv"

但是,如果我打算并行运行它以最大程度地使用spark上下文(如果df0 spark处理已完成并且本地R正在对其进行处理,df1可能已经被spark处理了]:

> library(parallel)
> library(MASS)
> mclapply(input, f, mc.cores = global_parallelism)

 *** caught segfault ***
address 0x560b2c134003, cause 'memory not mapped'
[[1]]
[1] "Error in as.vector(x, \"list\") : \n  cannot coerce type 'environment' to vector of type 'list'\n"
attr(,"class")
[1] "try-error"
attr(,"condition")
<simpleError in as.vector(x, "list"): cannot coerce type 'environment' to vector of type 'list'>

[[2]]
NULL

Warning messages:
1: In mclapply(input, f, mc.cores = global_parallelism) :
  scheduled core 2 did not deliver a result, all values of the job will be affected
2: In mclapply(input, f, mc.cores = global_parallelism) :
  scheduled core 1 encountered error in user code, all values of the job will be affected

[当我使用Python和ThreadPoolExcutor做类似操作时,火花上下文在线程之间共享,Scala和Java相同。

是否有可能在R中的并行执行中重用sparklyr上下文​​?

r apache-spark parallel-processing sparklyr
1个回答
0
投票

[是,很遗憾,类别为scspark_connection对象无法导出到另一个R进程(即使使用了分叉处理)。如果您使用的是future.apply生态系统的一部分future软件包,则可以使用以下命令查看此内容:

library(future.apply)
plan(multicore)

## Look for non-exportable objects and given an error if found
options(future.globals.onReference = "error")

y <- future_lapply(input, f)

将抛出:

Error: Detected a non-exportable reference (‘externalptr’) in one of the
globals (‘sc’ of class ‘spark_connection’) used in the future expression
© www.soinside.com 2019 - 2024. All rights reserved.