我正在尝试了解 R 中的异步编程 - 我希望用户能够使用输入下拉列表创建“场景”,从中生成表格,然后用户可以单击表格中的一行并选择“操作” “一个函数 - 简化的情况是等待设定的秒数(来自用户输入),然后更改状态以完成:这是我一直在研究的一个示例,但我正在努力拥有我的多个版本如果我“操作”一个场景,然后另一个场景,第一个场景停止,函数就会运行 - 关于如何更改代码以在后台并行运行的任何想法?
这是我的代码:
library(shiny)
library(DT)
library(callr)
# Define server logic
server <- function(input, output, session) {
run_task <- function(sleep_duration) {
Sys.sleep(sleep_duration)
}
# Define reactiveValues to store data
data <- reactiveValues(table_data = NULL)
bg_proc <- reactiveVal(NULL)
check_finished <- reactiveVal(FALSE)
row_index <- reactiveVal(NULL)
observeEvent(input$add, {
# Generate table based on user's selection
# Append new selection to existing data
new_data <- data.frame(
Dropdown1 = input$dropdown1,
Dropdown2 = input$dropdown2,
Dropdown3 = input$dropdown3,
Status = "Pending"
)
if (is.null(data$table_data)) {
data$table_data <- new_data
} else {
data$table_data <- rbind(data$table_data, new_data)
}
})
observeEvent(input$action, {
# Get the index of the selected row
row_index(input$output_table_rows_selected)
if (length(row_index()) == 0) {
return() # No row selected, do nothing
}
# Execute the time-consuming operation in the background using callr::r_bg()
p <- r_bg(
func = function(run_task, sleep_duration) {
return(run_task(sleep_duration))
},
supervise = TRUE,
args = list(
run_task = run_task,
sleep_duration = as.numeric(data$table_data$Dropdown2[row_index()])
)
)
# Set the status of the selected row to "Completed" after operation completes
bg_proc(p)
check_finished(TRUE)
cat(paste0("\nStart at ", Sys.time()))
})
observe({
req(check_finished())
invalidateLater(1000)
# Set the status of the selected row to "In Progress"
data$table_data$Status[row_index()] <- "In Progress"
cat(paste0("\nStill busy at ", Sys.time()))
p <- bg_proc()
if (p$is_alive() == FALSE) {
check_finished(FALSE)
bg_proc(NULL)
data$table_data$Status[row_index()] <- "Completed"
cat(paste0("\nFINISHED ", Sys.time()))
}
})
observeEvent(input$deleteRows, {
# Delete selected rows
if (!is.null(input$output_table_rows_selected)) {
data$table_data <- data$table_data[-as.numeric(input$output_table_rows_selected), ]
}
})
# Render table
output$output_table <- renderDT({
data$table_data
})
}
# Define UI
ui <- fluidPage(
titlePanel("Dashboard"),
sidebarLayout(
sidebarPanel(
selectInput("dropdown1", "Dropdown 1", choices = c("Option 1", "Option 2", "Option 3")),
selectInput("dropdown2", "Duration of Sleep", choices = c("1", "10", "60")),
selectInput("dropdown3", "Dropdown 3", choices = c("Option X", "Option Y", "Option Z")),
actionButton("add", "Add"),
actionButton("edit", "Edit"),
actionButton("deleteRows", "Delete Rows"),
actionButton("action", "Action")
),
mainPanel(
DTOutput("output_table")
)
)
)
# Run the application
shinyApp(ui = ui, server = server)
让应用程序正常工作。
您需要维护所有进程的列表(或此类列表)。目前您只查看最后一个。这是一个稍微重构了代码的解决方案:
tibble
中。为了识别进程,我们在启动进程后立即分配一个唯一的名称(这是稳定的,而不是仅仅查看行索引)tibble
即可。library(shiny)
library(DT)
library(callr)
library(dplyr)
library(purrr)
library(cli)
# Define server logic
server <- function(input, output, session) {
run_task <- function(sleep_duration) {
Sys.sleep(sleep_duration)
}
# Define reactiveValues to store data
data <- reactiveValues(table_data = NULL)
bg_procs <- reactiveVal(tibble(name = character(0),
proc = vector("list", 0),
finished = logical(0)))
observeEvent(input$add, {
id <- paste0("job_", input$add)
# Generate table based on user's selection
# Append new selection to existing data
new_data <- data.frame(
id = id,
Dropdown1 = input$dropdown1,
Dropdown2 = input$dropdown2,
Dropdown3 = input$dropdown3,
Status = "Pending"
)
if (is.null(data$table_data)) {
data$table_data <- new_data
} else {
data$table_data <- bind_rows(data$table_data, new_data)
}
})
observeEvent(input$action, {
# Get the index of the selected row
row_index <- req(input$output_table_rows_selected)
req(data$table_data[row_index, "Status"] == "Pending")
# Execute the time-consuming operation in the background using callr::r_bg()
p <- r_bg(
func = function(run_task, sleep_duration) {
return(run_task(sleep_duration))
},
supervise = TRUE,
args = list(
run_task = run_task,
sleep_duration = as.numeric(data$table_data$Dropdown2[row_index])
)
)
nm <- data$table_data$id[row_index]
bg_procs(
bg_procs() %>%
bind_rows(tibble(name = nm,
proc = list(p),
finished = FALSE))
)
cli_alert_warning("Proc '{nm}' at row <{row_index}> started at {Sys.time()}")
})
observe({
## check if there are unfinished tasks
procs <- bg_procs()
stati <- procs$finished
req(length(stati) > 0)
data$table_data <- isolate(data$table_data) %>%
left_join(procs, c(id = "name")) %>%
mutate(id,
Dropdown1,
Dropdown2,
Dropdown3,
Status = case_when(
is.na(finished) ~ "Pending",
finished ~ "Completed",
!finished ~ "In Progress"),
.keep = "none")
req(!all(stati))
invalidateLater(1000)
running <- map_lgl(procs$proc, ~ .x$is_alive())
now_finished <- !running & !stati
## status message
if (any(running)) {
msg <- paste("{qty(sum(running))} Process{?es}",
"{procs$name[running]}",
"still busy at {Sys.time()}")
cli_alert_info(msg)
}
if (any(now_finished)) {
msg <- paste("{qty(sum(now_finished))} Process{?es}",
"{procs$name[now_finished]}",
"FINSIHED at {Sys.time()}")
cli_alert_success(msg)
}
## update tables
bg_procs(procs %>%
mutate(finished = !running))
})
observeEvent(input$deleteRows, {
# Delete selected rows
if (!is.null(input$output_table_rows_selected)) {
idx <- as.numeric(input$output_table_rows_selected)
data$table_data <- data$table_data[-idx, ]
}
})
# Render table
output$output_table <- renderDT({
data$table_data
}, selection = "single")
}