当将terior_epred与foreach()并行时,有没有办法导出BRMS模型中使用的函数?

问题描述 投票:0回答:1

我正在根据大量新数据预测 BRMS 分类模型。当我尝试使用整个数据集进行预测时,R 崩溃或耗尽向量内存。为了解决这个问题,我想循环遍历我的新数据并使用

foreach()
加快速度。但是,即使我在
foreach()
循环中重新定义函数,我似乎也无法导出用于缩放原始模型中的数据的函数。我不确定我是否完全理解 BRMS 公式中的函数如何存储在模型中,这可能就是我难以导出它的原因。

这基本上是我的代码的样子:

test_data <- data.frame(category=c(rep("a",10),
                                   rep("b",10),
                                   rep("c",10)),
                        var1 = runif(30,min=0,max=20))

scale01 <- function(x) (x-min(x))/(max(x)-min(x))

test_brm <- brm(category ~ scale01(var1),
                family=categorical(refcat="a"),
                data=test_data)

prediction_data <- data.frame(id=rep(c(1:500),2),
                              var1=runif(1000,min=0,max=20))

cl <- makeCluster(parallel::detectCores())
registerDoSNOW(cl)

preds <- foreach(id = prediction_data$id,.combine=rbind,
                 packages=c("brms","matrixStats")) %dopar% {
                   
                   scale01 <- function(x) (x-min(x))/(max(x)-min(x))
                   
                   colMeans(posterior_epred(test_brm,
                                            newdata=prediction_data[which(prediction_data$id==id),]))
                   
                 }

我得到的错误是

Error in { : task 1 failed - "could not find function "scale01""

我知道我使用的缩放函数依赖于每次都有相同的最大值和最小值,但我的新数据确实如此(也许可以对其进行硬编码,但不认为这是这里的问题)。

有什么建议吗?谢谢!

r foreach parallel-processing brms
1个回答
0
投票

问题在于,当公式导出到并行工作程序时,

scale01()
不再与公式
category ~ scale01(var1)
关联。这与他们都生活在全球环境中有关,而全球环境永远不会输出到并行工作者。要解决这个问题,您可以将
scale01()
设为与
brmsfit
对象相同的本地环境的一部分,如下所示。

library(foreach)
library(doParallel)
library(brms)

test_data <- data.frame(category=c(rep("a",10),
                                   rep("b",10),
                                   rep("c",10)),
                        var1 = runif(30,min=0,max=20))

test_brm <- local({
  scale01 <- function(x) (x-min(x))/(max(x)-min(x))
  
  brm(category ~ scale01(var1),
      family = categorical(refcat="a"),
      data = test_data)
})                 

prediction_data <- data.frame(
  id = rep(c(1:500), times = 2),
  var1 = runif(1000, min = 0, max = 20)
)

cl <- makeCluster(parallelly::availableCores())
registerDoParallel(cl)

preds <- foreach(id = prediction_data$id, .combine = rbind, .packages = c("brms", "matrixStats")) %dopar% {
  data <- prediction_data[which(prediction_data$id == id), ]
  colMeans(posterior_epred(test_brm, newdata=data))
}

parallel::stopCluster(cl)

这里我将 doSNOW 替换为 doParallel。您不想依赖 doSNOW,因为它依赖于 snow 包,它现在被认为是遗留包。 parallel 软件包是所有 R 安装的一部分,自 2014 年起取代了 snow 的角色。

另外,重要的是,不要默认为

detectCores()
。它可能会对共享计算机造成严重破坏。相反,请使用
parallelly::availableCores()
。有关详细信息,请参阅https://www.jottr.org/2022/12/05/avoid-detectcores/

现在,作为 Futureverse 的作者,我还应该指出以下实现相同功能的替代代码,同时为您自动化许多细节。它还支持并行工作人员的打印输出、正确的错误处理等,如果您遇到其他问题,这些都会为您提供帮助。

library(foreach)
library(doFuture)
library(brms)

test_data <- data.frame(category=c(rep("a",10),
                                   rep("b",10),
                                   rep("c",10)),
                        var1 = runif(30,min=0,max=20))

test_brm <- local({
  scale01 <- function(x) (x-min(x))/(max(x)-min(x))
  
  brm(category ~ scale01(var1),
      family = categorical(refcat="a"),
      data = test_data)
})                 

prediction_data <- data.frame(
  id = rep(c(1:500), times = 2),
  var1 = runif(1000, min = 0, max = 20)
)

plan(multisession)

preds <- foreach(id = prediction_data$id, .combine = rbind) %dofuture% {
  data <- prediction_data[which(prediction_data$id == id), ]
  colMeans(posterior_epred(test_brm, newdata=data))
}

plan(sequential)

顺便说一句,您可能希望设置一个“迭代器”(请参阅

iterators
包)来在主 R 会话中进行子集化,而不是在每次迭代中对 data 进行子集化。这将避免将整个
prediction_data
对象发送给每个并行工作线程,这可能会很昂贵,特别是对于数据集。

© www.soinside.com 2019 - 2024. All rights reserved.