按列将新数据添加到duckdb内存不足

问题描述 投票:0回答:1

我有传入的数据想要存储在数据库或其他磁盘上。数据看起来像这样

incoming_data <- function(ncol=5){
  dat <- sample(1:10,100,replace = T) |> matrix(ncol = ncol) |> as.data.frame()
  random_names <- sapply(1:ncol(dat),\(x) paste0(sample(letters,1), sample(1:100,1)))
  colnames(dat) <- random_names
  dat
}


incoming_data()

这个

incoming_data
只是举例.. 实际上,一组
incoming_data
会有多个
5k rows
,大约有
50k columns
。整个最终文件约为
200-400 gigabytes

我的问题是如何将新数据作为列添加到数据库中而不将文件加载到 RAM 中

# your way
path <- "D:\\R_scripts\\new\\duckdb\\data\\DB.duckdb"
library(duckdb)
con <- dbConnect(duckdb(), dbdir = path, read_only = FALSE)
#  write one piece of data in DB
dbWriteTable(con, "my_dat", incoming_data())


#### how to make something like this ####
my_dat <- cbind("my_dat", incoming_data())
r cbind duckdb
1个回答
0
投票

假设传入批次的数据中行数保持不变,您可以使用

positonal join
here)来实现您想要的:

library(duckdb)
library(DBI)
library(purrr)

incoming_data <- function(ncol=5){
  dat <- sample(1:10,100,replace = T) |> matrix(ncol = ncol) |> as.data.frame()
  random_names <- sapply(1:ncol(dat),\(x) paste0(sample(letters,1), sample(1:100,1)))
  colnames(dat) <- random_names
  dat
}

# Generate batches of data of 
data_to_join <- rep(list(incoming_data()), 5)

# let's create some files with data
tmp_dir <- tempdir()
data_dir <- paste0(tmp_dir, "/data")
dir.create(data_dir)

walk2(
  data_to_join,
  seq_len(length(data_to_join)), 
  \(x, i) ({
    file_out <- paste0(data_dir, "/", i,".csv")
    write.csv(x, file_out, row.names = FALSE, quote = FALSE)
  })
)

csv_files <- list.files(data_dir, full.names = TRUE)

con <- dbConnect(duckdb(), read_only = FALSE)

# write first columns to duckdb instance
duckdb_read_csv(con, "my_dat", csv_files[1])

# Recursively add new columns by self joining with new columns from file.
walk(csv_files[-1], 
     \(file) ({
       create_query <- sprintf(
         "CREATE OR REPLACE TABLE my_dat AS SELECT * FROM my_dat positional join read_csv_auto('%s');", 
         csv_files[n]
       )
       dbSendQuery(con, create_query)
     })
) 

dbReadTable(con, "my_dat")


# Disconnect from connection
dbDisconnect(con, shutdown = TRUE)

对于每批新传入的数据,您可以运行上面的

create or replace statement
将新列绑定到现有数据;

您还可以调整它以使用 r 对象更新表:

# Generate batches of data of 
data_to_join <- rep(list(incoming_data()), 5)

con <- dbConnect(duckdb(), read_only = FALSE)

# write first iteration
dbWriteTable(con, "my_dat", data_to_join[[1]])

# Recursively add new columns by self joining with new columns from each available data 
walk(
  data_to_join[-1], 
     \(x) ({
       dbWriteTable(con, "tmp_tbl", x, overwrite = TRUE, temporary = TRUE)
       dbSendQuery(
         con, 
         "CREATE OR REPLACE TABLE my_dat AS SELECT * FROM my_dat positional join tmp_tbl;"
        )
       dbRemoveTable(con, "tmp_tbl")
     })
)

dbReadTable(con, "my_dat")
# Disconnect from connection
dbDisconnect(con, shutdown = TRUE)

关于您的问题,如何在不将文件加载到内存中的情况下执行此过程:根据我的经验,直接将文件加载到 duckdb 中而不将它们加载到 R 中应该是这里的最佳实践,并且原则上会避免该问题。

您可能需要打开和关闭每个加载文件的连接,以避免 R 会话崩溃,但这可能是我在本地遇到的一个奇怪的问题,可能不会转化为这里的问题。

我希望最终能有所帮助:)

© www.soinside.com 2019 - 2024. All rights reserved.