我有传入的数据想要存储在数据库或其他磁盘上。数据看起来像这样
incoming_data <- function(ncol=5){
dat <- sample(1:10,100,replace = T) |> matrix(ncol = ncol) |> as.data.frame()
random_names <- sapply(1:ncol(dat),\(x) paste0(sample(letters,1), sample(1:100,1)))
colnames(dat) <- random_names
dat
}
incoming_data()
这个
incoming_data
只是举例..
实际上,一组 incoming_data
会有多个 5k rows
,大约有 50k columns
。整个最终文件约为 200-400 gigabytes
我的问题是如何将新数据作为列添加到数据库中而不将文件加载到 RAM 中
# your way
path <- "D:\\R_scripts\\new\\duckdb\\data\\DB.duckdb"
library(duckdb)
con <- dbConnect(duckdb(), dbdir = path, read_only = FALSE)
# write one piece of data in DB
dbWriteTable(con, "my_dat", incoming_data())
#### how to make something like this ####
my_dat <- cbind("my_dat", incoming_data())
假设传入批次的数据中行数保持不变,您可以使用
positonal join
(here)来实现您想要的:
library(duckdb)
library(DBI)
library(purrr)
incoming_data <- function(ncol=5){
dat <- sample(1:10,100,replace = T) |> matrix(ncol = ncol) |> as.data.frame()
random_names <- sapply(1:ncol(dat),\(x) paste0(sample(letters,1), sample(1:100,1)))
colnames(dat) <- random_names
dat
}
# Generate batches of data of
data_to_join <- rep(list(incoming_data()), 5)
# let's create some files with data
tmp_dir <- tempdir()
data_dir <- paste0(tmp_dir, "/data")
dir.create(data_dir)
walk2(
data_to_join,
seq_len(length(data_to_join)),
\(x, i) ({
file_out <- paste0(data_dir, "/", i,".csv")
write.csv(x, file_out, row.names = FALSE, quote = FALSE)
})
)
csv_files <- list.files(data_dir, full.names = TRUE)
con <- dbConnect(duckdb(), read_only = FALSE)
# write first columns to duckdb instance
duckdb_read_csv(con, "my_dat", csv_files[1])
# Recursively add new columns by self joining with new columns from file.
walk(csv_files[-1],
\(file) ({
create_query <- sprintf(
"CREATE OR REPLACE TABLE my_dat AS SELECT * FROM my_dat positional join read_csv_auto('%s');",
csv_files[n]
)
dbSendQuery(con, create_query)
})
)
dbReadTable(con, "my_dat")
# Disconnect from connection
dbDisconnect(con, shutdown = TRUE)
对于每批新传入的数据,您可以运行上面的
create or replace statement
将新列绑定到现有数据;
您还可以调整它以使用 r 对象更新表:
# Generate batches of data of
data_to_join <- rep(list(incoming_data()), 5)
con <- dbConnect(duckdb(), read_only = FALSE)
# write first iteration
dbWriteTable(con, "my_dat", data_to_join[[1]])
# Recursively add new columns by self joining with new columns from each available data
walk(
data_to_join[-1],
\(x) ({
dbWriteTable(con, "tmp_tbl", x, overwrite = TRUE, temporary = TRUE)
dbSendQuery(
con,
"CREATE OR REPLACE TABLE my_dat AS SELECT * FROM my_dat positional join tmp_tbl;"
)
dbRemoveTable(con, "tmp_tbl")
})
)
dbReadTable(con, "my_dat")
# Disconnect from connection
dbDisconnect(con, shutdown = TRUE)
关于您的问题,如何在不将文件加载到内存中的情况下执行此过程:根据我的经验,直接将文件加载到 duckdb 中而不将它们加载到 R 中应该是这里的最佳实践,并且原则上会避免该问题。
您可能需要打开和关闭每个加载文件的连接,以避免 R 会话崩溃,但这可能是我在本地遇到的一个奇怪的问题,可能不会转化为这里的问题。
我希望最终能有所帮助:)