如何使用 issplit() 在 foreach 循环中将完整数据集传递给一个工作人员并将特定子集传递给其他工作人员

问题描述 投票:0回答:1

我目前正在针对因子变量的每个级别的数据子集拟合一组模型。由于模型需要很长时间才能运行,因此我使用

foreach
doParallel
包使用 %dopar% 并行估计变量每个级别的模型集。我仅使用
iterators
包中的 isplit() 函数将数据子集传递给每个工作人员以避免内存问题。

现在,我的问题是如何扩展我的代码,以便在第一次迭代中,通过将完整数据集传递给其中一个工作人员,在整个数据集上估计模型。在接下来的迭代中,我只想将数据的子集传递给每个工作人员并估计模型。

我使用下面

mtcars
数据集的一些示例数据来说明我的问题。

假设我想通过汽车的气缸数(

gear
列)并行计算一辆汽车拥有的平均前进档数(
cyl
列)。

首先加载包并导入数据

library(doParallel)
library(foreach)
library(iterators)
library(dplyr)
#get sample data to illustrate problem  
data("mtcars")
df <- mtcars
df$cyl <- as.factor(df$cyl) #make cyl categorical 

接下来,迭代

cyl
列的每个级别并进行必要的计算

mycluster <- makeCluster(3)
registerDoParallel(mycluster)

result <- foreach(subset = isplit(df, df$cyl), .combine = "c", .packages = "dplyr") %dopar% {
  x <- summarise(subset$value, mean(gear, na.rm = T))
  return(x)
}
stopCluster(mycluster)

结果是一个列表,其中包含每种气缸数类别的平均齿轮数。

> result
$`mean(gear, na.rm = T)`
[1] 4.090909

$`mean(gear, na.rm = T)`
[1] 3.857143

$`mean(gear, na.rm = T)`
[1] 3.285714

现在,我想要扩展这段代码,这样我就有四次迭代。在第一次迭代中,我想将完整数据集传递给第一个工作人员,并计算整个数据集中包含的所有汽车的平均齿轮数。接下来,我想将

gear
每个级别的特定数据子集传递给其他工作人员,并计算平均齿轮数,如上所示。所以新的事情只是向 isplit() 语句添加一次迭代,我在其中传递完整的数据集。

预期输出:

> result
$`mean(gear, na.rm = T)` #average number of gears across all cars in dataset
[1] 3.6875

$`mean(gear, na.rm = T)`
[1] 4.090909

$`mean(gear, na.rm = T)`
[1] 3.857143

$`mean(gear, na.rm = T)`
[1] 3.285714

我知道这个例子很愚蠢,但它说明了我想要实现的目标。实际上,我使用一个非常大的数据集并估计了几个模型,每个模型都需要很长时间才能运行。然而,这些数据来自人口普查,所以我无法分享其中的几行。

r foreach iterator doparallel
1个回答
0
投票

如果您没有使用迭代器,这就像传递

subset = c(list(df), split(df, df$cyl))
一样简单。但是,您不能像这样连接迭代器列表。相反,让我们编写一个创建新型迭代器的函数。这将基于
iterators::isplit.data.frame()
。然而,不同之处在于,在第一次迭代时它将产生整个数据框:

isplit.extra <- function(x, f, drop = FALSE, ...) {
    first_iter <- TRUE
    it <- isplit(seq_len(nrow(x)), f, drop = drop, ...)
    nextEl <- function() {
        # On first iteration return the entire data frame
        if (first_iter) {
            first_iter <<- FALSE
            return(list(value = x, key = "all_data"))
        }
        # Otherwise split the data frame in the normal way
        i <- nextElem(it)
        list(value = x[i$value, , drop = FALSE], key = i$key)
    }
    structure(list(nextElem = nextEl), class = c("abstractiter", "iter"))
}

这是

isplit()
的方法,适用于具有类
extra
的对象。因此,我们将该类应用于您的
df
,并且我们不需要更改对
isplit()
的调用,因为它将调度适当的方法。我更改了您的
summarise()
调用以获得更有用的列名称。我已经标记了我更改的行。

class(df) <- c("extra", class(df)) # new
result <- foreach(
    subset = isplit(df, df$cyl), # the same
    .combine = "c", .packages = "dplyr"
) %dopar% {
    x <- summarise(
        subset$value,
        "cyl_{subset$key}" := mean(gear, na.rm = T) # changed
    )
    return(x)
}

输出为:

$cyl_all_data
[1] 3.6875

$cyl_4
[1] 4.090909

$cyl_6
[1] 3.857143

$cyl_8
[1] 3.285714
© www.soinside.com 2019 - 2024. All rights reserved.