在 Shiny callR 中使用用户传递的参数运行后台函数

问题描述 投票:0回答:1

我正在尝试了解 R 中的异步编程 - 我希望用户能够使用输入下拉列表创建“场景”,从中生成表格,然后用户可以单击表格中的一行并选择“操作” “一个函数 - 简化的情况是等待设定的秒数(来自用户输入),然后更改状态以完成:这是我一直在研究的一个示例,但我正在努力拥有我的多个版本如果我“操作”一个场景,然后另一个场景,第一个场景停止,函数就会运行 - 关于如何更改代码以在后台并行运行的任何想法?

这是我的代码:

library(shiny)
library(DT)
library(callr)

# Define server logic
server <- function(input, output, session) {
  
  run_task <- function(sleep_duration) {
    Sys.sleep(sleep_duration)
  }
  
  # Define reactiveValues to store data
  data <- reactiveValues(table_data = NULL)
  bg_proc <- reactiveVal(NULL)
  check_finished <- reactiveVal(FALSE)
  row_index <- reactiveVal(NULL)
  
  observeEvent(input$add, {
    # Generate table based on user's selection
    # Append new selection to existing data
    new_data <- data.frame(
      Dropdown1 = input$dropdown1,
      Dropdown2 = input$dropdown2,
      Dropdown3 = input$dropdown3,
      Status = "Pending"
    )
    if (is.null(data$table_data)) {
      data$table_data <- new_data
    } else {
      data$table_data <- rbind(data$table_data, new_data)
    }
  })
  
  observeEvent(input$action, {
    # Get the index of the selected row
    row_index(input$output_table_rows_selected)
    
    if (length(row_index()) == 0) {
      return()  # No row selected, do nothing
    }
    
    # Execute the time-consuming operation in the background using callr::r_bg()
    p <- r_bg(
      func = function(run_task, sleep_duration) {
        return(run_task(sleep_duration))
      },
      supervise = TRUE,
      args = list(
        run_task = run_task,
        sleep_duration = as.numeric(data$table_data$Dropdown2[row_index()])
      )
    )
    # Set the status of the selected row to "Completed" after operation completes
    bg_proc(p)
    check_finished(TRUE)
    cat(paste0("\nStart at ", Sys.time()))
  })
  
  observe({
    req(check_finished())
    invalidateLater(1000)
    # Set the status of the selected row to "In Progress"
    data$table_data$Status[row_index()] <- "In Progress"
    cat(paste0("\nStill busy at ", Sys.time()))
    p <- bg_proc()
    if (p$is_alive() == FALSE) {
      check_finished(FALSE)
      bg_proc(NULL)
      data$table_data$Status[row_index()] <- "Completed"  
      cat(paste0("\nFINISHED ", Sys.time()))
    }
  })
  
  observeEvent(input$deleteRows, {
    # Delete selected rows
    if (!is.null(input$output_table_rows_selected)) {
      data$table_data <- data$table_data[-as.numeric(input$output_table_rows_selected), ]
    }
  })
  
  # Render table
  output$output_table <- renderDT({
    data$table_data
  })
}

# Define UI
ui <- fluidPage(
  titlePanel("Dashboard"),
  
  sidebarLayout(
    sidebarPanel(
      selectInput("dropdown1", "Dropdown 1", choices = c("Option 1", "Option 2", "Option 3")),
      selectInput("dropdown2", "Duration of Sleep", choices = c("1", "10", "60")),
      selectInput("dropdown3", "Dropdown 3", choices = c("Option X", "Option Y", "Option Z")),
      
      actionButton("add", "Add"),
      actionButton("edit", "Edit"),
      actionButton("deleteRows", "Delete Rows"),
      actionButton("action", "Action")
    ),
    
    mainPanel(
      DTOutput("output_table")
    )
  )
)

# Run the application
shinyApp(ui = ui, server = server)

让应用程序正常工作。

r asynchronous shiny
1个回答
0
投票

您需要维护所有进程的列表(或此类列表)。目前您只查看最后一个。这是一个稍微重构了代码的解决方案:

  1. 我们将有关后台进程的所有信息保存在
    tibble
    中。为了识别进程,我们在启动进程后立即分配一个唯一的名称(这是稳定的,而不是仅仅查看行索引)
  2. 创建新流程时,我们只需将其添加到
    tibble
    即可。
  3. 最终,我们定期循环所有进程,如果有任何待处理的进程,我们会打印一条日志消息并安排另一次检查。
library(shiny)
library(DT)
library(callr)
library(dplyr)
library(purrr)
library(cli)

# Define server logic
server <- function(input, output, session) {
  
  run_task <- function(sleep_duration) {
    Sys.sleep(sleep_duration)
  }
  
  # Define reactiveValues to store data
  data <- reactiveValues(table_data = NULL)
  bg_procs <- reactiveVal(tibble(name = character(0), 
                                 proc = vector("list", 0), 
                                 finished = logical(0)))
  
  observeEvent(input$add, {
    id <- paste0("job_", input$add)
    # Generate table based on user's selection
    # Append new selection to existing data
    new_data <- data.frame(
      id = id,
      Dropdown1 = input$dropdown1,
      Dropdown2 = input$dropdown2,
      Dropdown3 = input$dropdown3,
      Status = "Pending"
    )
    if (is.null(data$table_data)) {
      data$table_data <- new_data
    } else {
      data$table_data <- bind_rows(data$table_data, new_data)
    }
  })
  
  observeEvent(input$action, {
    # Get the index of the selected row
    row_index <- req(input$output_table_rows_selected)
    req(data$table_data[row_index, "Status"] == "Pending")
    # Execute the time-consuming operation in the background using callr::r_bg()
    p <- r_bg(
      func = function(run_task, sleep_duration) {
        return(run_task(sleep_duration))
      },
      supervise = TRUE,
      args = list(
        run_task = run_task,
        sleep_duration = as.numeric(data$table_data$Dropdown2[row_index])
      )
    )
    nm <- data$table_data$id[row_index]
    bg_procs(
      bg_procs() %>% 
        bind_rows(tibble(name = nm, 
                         proc = list(p), 
                         finished = FALSE))
    )
    cli_alert_warning("Proc '{nm}' at row <{row_index}> started at {Sys.time()}")
  })
  
  observe({
    ## check if there are unfinished tasks
    procs <- bg_procs()
    stati <- procs$finished
    req(length(stati) > 0)
    data$table_data <- isolate(data$table_data) %>% 
      left_join(procs, c(id = "name")) %>% 
      mutate(id,
             Dropdown1,
             Dropdown2,
             Dropdown3,
             Status = case_when(
               is.na(finished) ~ "Pending",
               finished ~ "Completed", 
               !finished ~ "In Progress"),
             .keep = "none")
    req(!all(stati))
    invalidateLater(1000)
    running <- map_lgl(procs$proc, ~ .x$is_alive())
    now_finished <- !running & !stati
    ## status message
    if (any(running)) {
      msg <- paste("{qty(sum(running))} Process{?es}",
                   "{procs$name[running]}",
                   "still busy at {Sys.time()}")
      cli_alert_info(msg)
    }
    if (any(now_finished)) {
      msg <- paste("{qty(sum(now_finished))} Process{?es}", 
                   "{procs$name[now_finished]}",
                   "FINSIHED at {Sys.time()}")
      cli_alert_success(msg)
    }
    ## update tables
    bg_procs(procs %>% 
               mutate(finished = !running))
  })
  
  observeEvent(input$deleteRows, {
    # Delete selected rows
    if (!is.null(input$output_table_rows_selected)) {
      idx <- as.numeric(input$output_table_rows_selected)
      data$table_data <- data$table_data[-idx, ]
    }
  })
  
  # Render table
  output$output_table <- renderDT({
    data$table_data 
  }, selection = "single")
}
© www.soinside.com 2019 - 2024. All rights reserved.