通过 Rscript 运行时 doPar 不会初始化

问题描述 投票:0回答:1

我想创建一个 Rscript,它使用

dopar
来计算使用多个内核的多个文件的行数,然后输出文件名的 TSV 以及行数(除以 4)。

我使用的是 Ubuntu 20.04。如果我通过打开 R 并复制/粘贴来迭代使用以下脚本,它就会起作用。如果我在命令行中使用

Rscript
使用脚本,则会出现以下错误:
ssh: connect to host 0.0.0.20 port 22: Connection timed out
。我认为这是 PSOCK 集群注册的问题。如何在命令行中使用
Rscript
使该脚本正常工作?

(注意:迭代输入它,我会加载库,然后只需定义

raw.path
min.reads
n.cores
,然后再毫无问题地复制和粘贴脚本的其余部分。)

#! /usr/bin/Rscript

###### 
## Counting and excluding low read fastqs

# This script counts the number of reads in a fastq and moves those under a certain
# threshold to a separate file


## Collect arguments
args <- commandArgs(TRUE)

## Parse arguments (we expect the form --arg=value)
parseArgs <- function(x) strsplit(sub("^--", "", x), "=")
argsL <- as.list(as.character(as.data.frame(do.call("rbind", parseArgs(args)))$V2))
names(argsL) <- as.data.frame(do.call("rbind", parseArgs(args)))$V1
args <- argsL
rm(argsL)

if("--help" %in% args | is.null(args$files) | is.null(args$min_reads)) {
  cat("
      Counts the number of reads in the gzipped FASTQs in a folder, then moves
      files below a certain number of reads to a folder called 'less-than-[min_reads]'
  
      --files=string        - Full path to folder with raw files
      --min_reads=numeric   - Minimum number of reads for a file to be included
      --n_cores=numeric     - Number of cores to use
      
      Example:
      Rscript count_and_move_raw_fastqs.R --files=raw/ \\   \n --min_reads=2e6 \\ \n --n_cores=10 \n\n")
  
  q(save="no")
}

## Import arguments
raw.path <- args$files
min.reads <- args$min_reads
n.cores <- args$n_cores

# Import libraries
library(foreach)
library(doParallel)
library(tidyverse)

# set wd to raw.path
setwd(raw.path)


# Set up parallelization with n cores
my.cluster <- makePSOCKcluster(nnodes = n.cores, names = n.cores, outfile="")
registerDoParallel(cl = my.cluster)


# Create folder for low read files to go
output_dir <- paste("files-with-less-than", min.reads,"-reads", sep = "")
dir.create(output_dir)

# Create function to count the lines in the file, and move files < min.reads to a different directory
process_file <- function(file){
  
  # Get the base name (without R1) for each file
  file.name <- stringr::str_split_fixed(file, "_", n = 2)[,1]
  
  # Count the reads
  message(paste("Counting reads in ", file, " as of ", Sys.time(), sep = ""))
  reads <- as.numeric(system(paste("gunzip -c ", file, " | sed -n '$=' | awk '{print $1/4}'", sep = ""), intern = TRUE))
  
  # Move files if <= min.reads
  if(reads <= min.reads){
    # Find all files with the name
    files.to.move <- list.files(pattern = file.name)
    # Move them
    purrr::map(files.to.move, function(x) file.rename(x, paste(output_dir, x, sep = "/")))
  }
  
  # Create final df of reads and names
  output_txt <- file.path(getwd(), "reads_and_names_for_raw_files.txt")
  cat(paste(file.name, reads, "\n", sep = "\t"), file = output_txt, append = TRUE)
}

# Find the files to count
files.to.count <-  list.files(getwd(), pattern = "_R1_001.fastq.gz$")

# Loop over each file in parallel
foreach(file=files.to.count, .combine = c) %dopar%{
    process_file(file)
  }


stopCluster(my.cluster)
r linux bash rscript doparallel
1个回答
0
投票

由于您在类 Unix 上运行,请考虑仅使用 parallel 包的

mclapply()
函数:

#! /usr/bin/Rscript

###### 
## Counting and excluding low read fastqs

# This script counts the number of reads in a fastq and moves those under a certain
# threshold to a separate file


## Collect arguments
args <- commandArgs(TRUE)

## Parse arguments (we expect the form --arg=value)
parseArgs <- function(x) strsplit(sub("^--", "", x), "=")
argsL <- as.list(as.character(as.data.frame(do.call("rbind", parseArgs(args)))$V2))
names(argsL) <- as.data.frame(do.call("rbind", parseArgs(args)))$V1
args <- argsL
rm(argsL)

if("--help" %in% args | is.null(args$files) | is.null(args$min_reads)) {
  cat("
      Counts the number of reads in the gzipped FASTQs in a folder, then moves
      files below a certain number of reads to a folder called 'less-than-[min_reads]'
  
      --files=string        - Full path to folder with raw files
      --min_reads=numeric   - Minimum number of reads for a file to be included
      --n_cores=numeric     - Number of cores to use
      
      Example:
      Rscript count_and_move_raw_fastqs.R --files=raw/ \\   \n --min_reads=2e6 \\ \n --n_cores=10 \n\n")
  
  q(save="no")
}

## Import arguments
raw.path <- args$files
min.reads <- args$min_reads
n.cores <- args$n_cores

library(tidyverse)

# set wd to raw.path
setwd(raw.path)

# Create folder for low read files to go
output_dir <- paste("files-with-less-than", min.reads,"-reads", sep = "")
dir.create(output_dir)

# Create function to count the lines in the file, and move files < min.reads to a different directory
process_file <- function(file){
  
  # Get the base name (without R1) for each file
  file.name <- stringr::str_split_fixed(file, "_", n = 2)[,1]
  
  # Count the reads
  message(paste("Counting reads in ", file, " as of ", Sys.time(), sep = ""))
  reads <- as.numeric(system(paste("gunzip -c ", file, " | sed -n '$=' | awk '{print $1/4}'", sep = ""), intern = TRUE))
  
  # Move files if <= min.reads
  if(reads <= min.reads){
    # Find all files with the name
    files.to.move <- list.files(pattern = file.name)
    # Move them
    purrr::map(files.to.move, function(x) file.rename(x, paste(output_dir, x, sep = "/")))
  }
  
  # Create final list of reads and names
  paste(file.name, reads, "\n", sep = "\t")
}

# Find the files to count
files.to.count <-  list.files(getwd(), pattern = "_R1_001.fastq.gz$")

# Process each file in parallel and output a list of reads and names
output_txt = parallel::mclapply(files.to.count, process_file, mc.cores = n.cores)

# Serial write of the list of reads and names
outfile <- file.path(getwd(), "reads_and_names_for_raw_files.txt")
lapply(output_txt, cat, file = outfile)

这将减少内存开销,因为分叉的并行 R 会话共享大部分内存空间。通过套接字连接的 R 会话集合则不然。

此外,代码不是使用

cat
进行并行写入,而是将文本行收集到列表中,该列表是用普通的串行“lapply”写入的。不建议并行写入一个文件,除非特别注意写入者不会互相冲突,从而导致乱码文本。

我没有运行你的代码,因为我没有输入文件。让我知道它对您有何作用。

© www.soinside.com 2019 - 2024. All rights reserved.