我有一个
{targets}
管道,可以下载数千个气象站数据。为了有效地做到这一点,我使用动态分支和 crew
进行并行工作,并以 station-id 作为模式。
仅每个站的数据就很小(10k 天)。但下载所有数据后,我需要对每个数据进行分析。因此,我添加了一个使用 station-id 作为模式的目标,并将过滤后的数据集传递给函数。这导致我的机器崩溃(我有 32Gb 内存)。每个工作人员都消耗大量数据,就好像每个工作人员正在接收所有气象站的完整副本一样。
有没有一种方法可以在一个大对象上进行分支,而不必将其全部加载到内存中?我想过使用
tar_group
但这依赖于 dplyr::group_by
并且我的整个工作流程使用 data.table
...
这是我正在尝试做的事情的代表
library(targets)
tar_script({
tar_option_set(
controller = crew::crew_controller_local(workers = 2),
memory = "transient",
garbage_collection = TRUE,
storage = "worker"
)
library(crew)
tar_pipeline(
tar_target(ids,
1:100),
tar_target(station_data,
data.frame(id = ids,
val = runif(1000000)),
pattern = map(ids)),
tar_target(corrected_data,
sort(station_data[station_data$id == ids, 'val']),
pattern = ids)
)
},
ask = FALSE
)
tar_make()
看here我发现了与大对象相关的类似问题。 Landau 的建议是在
format = 'file'
中使用 {targets}
。这样我就可以控制和加载整个数据集的选定部分。
为了测试,我使用
format = file
为每个目标输出重写了上述表示,并且我的 RAM 使用率很低。所以它有效!
波纹管是修改后的表示。请注意,这将在您的磁盘中产生近 1Gb 的文件。
library(targets)
tar_script({
tar_option_set(
controller = crew::crew_controller_local(workers = 2),
memory = "transient",
garbage_collection = TRUE,
storage = "worker"
)
library(crew)
tar_pipeline(
tar_target(ids,
1:100),
tar_target(station_data_file,
{
arq <- file.path('data', paste0('input_', ids, '.RDS'))
saveRDS(data.frame(id = ids,
val = runif(1000000)), arq)
return(arq)
},
format = 'file',
pattern = map(ids)),
tar_target(corrected_data_file,
{
dt_in <- readRDS(station_data_file)
dt_out <- sort(dt_in$val)
arq_o <- sub('input', 'output', station_data_file)
saveRDS(dt_out, arq_o)
return(arq_o)
},
format = 'file',
pattern = map(station_data_file))
)
},
ask = FALSE
)
tar_make()