sparklyr spark_apply非常慢/完全没有响应。签入spark UI时,正在执行的阶段在utils.scala:204处收集。它正在执行0/1(1个正在运行)任务。应用了spark_apply的数据帧具有30个分区。此任务没有进展,为什么要执行单个任务]
library(sparklyr)
library(dplyr)
config=spark_config()
config=c(config, list("spark.files"="hdfs:///bundle/packages.tar","spark.dynamicAllocation.enabled"="false","spark.executor.memory"="10g","spark.executor.cores"="4","spark.executor.instances"="7"))
sc <- spark_connect(master="yarn", app_name = "demo",config = config,version="2.3.0")
demo_data <- spark_read_csv(sc,name='demo_data',path = '/data.txt',delimiter = '\t',infer_schema = FALSE, columns = list(column1 = "integer"))
spark_apply(demo_data, function(df) df * 10, packages = "packages.tar" ,columns=list(column1="integer"))
在书中“用R掌握火花”即使使用as a last resort,也使用spark_apply following explainations进行更高级的luraschi命令表明,已经投入了大量资金来克服瓶颈,尤其是箭头库的开发。
在github上,他解释说spark_apply遇到序列化问题,如here和here所述
另一方面,randomgambit认为性能问题是由于Sparklyr行为复制了每个节点上的整个R分布。
就我而言,问题不在本地模式下出现,但是在集群模式下首次执行spark_apply期间:
使用rstudio.com中的测试命令基准测试可提供以下性能来运行此表达式
sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
本地模式:
Unit: seconds
min lq mean median uq max neval
5.043947 5.043947 5.043947 5.043947 5.043947 5.043947 1
集群模式(一位主管,一位工人):
Unit: seconds
min lq mean median uq max neval
928.0637 928.0637 928.0637 928.0637 928.0637 928.0637 1
Unit: seconds
min lq mean median uq max neval
4.309775 4.309775 4.309775 4.309775 4.309775 4.309775 1
鉴于第二次执行要快得多,我相信Sparklyr在每个节点上复制整个R分布需要923秒= 15分钟,23秒。
这里是使用的代码:
library(dplyr)
library(sparklyr)
library(microbenchmark)
sc <- spark_connect(master = "local")
microbenchmark(
sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
,times = 1L)
conf <- spark_config()
conf[["spark.r.command"]] <- "d:/path_to/R-3.6.1/bin/Rscript.exe"
sc <- spark_connect(master="spark://192.168.0.12:7077",
version = "2.4.3",
spark_home = "C:\\Users\\username\\AppData\\Local\\spark\\spark-2.4.3-bin-hadoop2.7",
config = conf)
microbenchmark(
sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
,times = 1L)
microbenchmark(
sdf_len(sc, 5, repartition = 1) %>% spark_apply(function(e) I(e))
,times = 1L)