结合并行和 pbdMPI 包

问题描述 投票:0回答:1

我使用 mpirun 通过 MPI 并行化执行 R 脚本,使用 Snowfall 包。

我设置了并行化 '''

sfInit(并行= TRUE) ''' 那么我有 *** MPI_Init 发生错误 *** 在 NULL 通信器上 *** MPI_ERRORS_ARE_FATAL(此通信器中的进程现在将中止, *** 以及可能是您的 MPI 工作) [red075.cluster.local:190363] MPI_INIT 完成之前本地中止已成功完成,但无法聚合错误消息,并且无法保证所有其他进程都被终止!

当我使用 mpirun 启动脚本并在脚本中添加 '''sfInit(parallel = TRUE, type = "MPI")'' 时,出现错误 makeMPIcluster(.sfOption$nodes, outfile = tmp, homousous = TRUE, 中出现错误: 大小为 39 的 MPI 集群已在运行 sfInit(parallel = TRUE, type = "MPI") 中的错误: 雪簇启动失败! makeMPIcluster(.sfOption$nodes, outfile = tmp, homousous = TRUE, 中出现错误: 大小为 39 的 MPI 集群已在运行 makeMPIcluster(.sfOption$nodes, outfile = tmp, homousous = TRUE, 中出现错误: 大小为 39 的 MPI 集群已在运行 执行停止

我尝试不使用 sfInit,但代码将以顺序模式而不是并行模式运行。 '''

sfSource("CopulaFunctions.R") ''' 那么我有: 调用降雪函数而不先调用“sfInit”或在 sfStop() 之后调用。 现在调用“sfInit()”。 降雪1.84-6.3初始化:顺序执行,一个CPU。

===== 编辑:添加 SLURM 作业脚本 在 R 代码中我使用

library(snowfall)    
sfSetMaxCPUs( number = 40 )
sfInit(parallel = TRUE, cpus = 40, type = "MPI")  ...etc

是的,我在 MPI 集群上提交了 Slurm 作业

 #!/bin/bash

#SBATCH --nodes=1                # Number of nodes requested
#SBATCH --ntasks-per-node=40     # Tasks per node
#SBATCH --time=20:00:00
 
module load openmpi/4.1.4/intel
module load gsl/2.7
module load R
 
 
 
# Run R script in batch mode
mpirun  -n 40 /local/software/R/4.2.1/build-gcc/lib64/R/library/snow/RMPISNOW CMD BATCH test_rmpi.R outputrmpi.out

========更新:这是修复错误并能够成功运行代码后的代码。


library(xts)
library(rugarch)
library(mvtnorm)
library(copula)
library(pbdMPI)
library(parallel)
cores <- 40

# here reading the data  (reading or defining n, N, lag, q, V,ll)

# DAX 30
DAX.data  <- read.csv("GDAXI.csv", header=TRUE)  # make sure csv-file is in working directory!
DAX       <- rev(DAX.data$Close)                        # DAX closing prices from 2000-01-01 to 2020-12-31
Datum.DAX <- rev(DAX.data$Date)[-1]                     # read in dates and ...
Datum.DAX <- as.Date(Datum.DAX, "%m/%d/%y")             # ... convert dates.
lr.DAX    <- diff( log(DAX) )                           # Investigate losses
lr.DAX    <- xts(x = lr.DAX,  order.by = Datum.DAX)     # create xts-object for easier manipulation

# GSPC
GSPC.data  <- read.csv("GSPC.csv", header=TRUE)  # make sure csv-file is in working directory!
GSPC       <- rev(GSPC.data$Close)                      # GSPC closing prices from 2000-01-01 to 2020-12-31
Datum.GSPC <- rev(GSPC.data$Date)[-1]                   # read in dates and ...
Datum.GSPC <- as.Date(Datum.GSPC, "%m/%d/%y")           # ... convert dates.
lr.GSPC    <- diff( log(GSPC) )                         # Investigate losses
lr.GSPC    <- xts(x = lr.GSPC,  order.by = Datum.GSPC)  # create xts-object for easier manipulation

# merge both DAX and GSPC
V.lr    <- merge.xts(lr.DAX, lr.GSPC, all=FALSE)        # merged log-returns and ...
V.ll    <- -V.lr                                        # ... log-losses
Datum   <- index(V.lr)                                  # save dates for later use
N       <- nrow(V.lr)                                   # total length of time series (includes some observations after 2019)
n       <- 1000                                         # length of rolling window


# ===========================================================
# 2. Rolling-window analysis of CoVaR and VaR forecasts
# ===========================================================


lag    <- 1     # use estimated model to forecast (lag)-steps-ahead, then re-estimate (i.e., we use daily re-estimates)
p      <- 0.95   # equals \alpha in paper
q      <- 0.95 



source("CopulaFunctions.R")
source("MLEstimation.R")
source("RMApplication.R")

wrapper <- function(i, n, lag, p, q, spec, V.ll){
  (set.seed(12345 + i) ) # set rng stream for ith loop item    
  # 1. Estimate rolling-window parameters
  R   <- t(coredata(V.ll[(i-n+1) : (i+lag), ]))
  Est <- GARCH.est(R, spec, lag, v=10)
 if (is.logical(Est) && !Est) {
    print("Estimation failed, skipping...")
    return(NA)
  }        # if estimators did not converge
  
  # 2. Produce rolling window CoVaR forecasts
  R <- t(coredata( V.ll[(i+1) : (i+lag), ] ))
  CoVaR.forecasts.t <- CoVaR.t.forecast(R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.t, lag, p, q)

  CoVaR.forecasts.Gauss <- CoVaR.Gauss.forecast( R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.Gauss, lag, p, q)
  
  return( list(CoVaR.forecasts.t     = CoVaR.forecasts.t,
               CoVaR.forecasts.Gauss = CoVaR.forecasts.Gauss) )
}




loop <- seq.int(from = n, to = N- 1, by = lag)

# GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "fGARCH", garchOrder = c(1, 1), submodel = "GARCH"),
                  mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
                  distribution.model = "norm")          # simple GARCH(1,1) specification
Forecasts.GARCH <- mclapply(loop, wrapper,n =n, lag=lag, p=p, q=q, spec, V.ll=V.ll, mc.cores = cores)

print(Forecasts.GARCH)


# GJR-GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "gjrGARCH", garchOrder = c(1, 1)),
                mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
                  distribution.model = "norm")          # GJR-GARCH(1,1) specification

Forecasts.GJR  <- mclapply(loop,wrapper,n= n, lag=lag, p=p, q=q, spec, V.ll=V.ll, mc.cores = cores)

print(Forecasts.GJR)

finalize()

我现在唯一的问题是如何在2个节点中运行它,我尝试请求2个节点,但不幸的是它计算了每次迭代两次。

任何建议如何处理。谢谢你

r mpi
1个回答
0
投票

从您在另一个问题中的评论中,我发现您尝试运行的代码很容易修改,以便与 MPI(通过 pbdMPI)和 Unix fork(通过

parallel::mclapply()
)的组合运行。

雪包早于 MPI 和 Slurm。尽管它已经更新,但它是为管理者-工作者编程而设计的,这并不是 MPI 在当今 Linux 集群上的典型使用方式。

此外,每个节点运行 40 个 MPI 实例将为每个节点制作 40 个数据副本。更好的方法是在节点内使用

parallel::mclapply
并使用 MPI 跨节点进行扩展。

尝试以下方法

code.R

library(xts)
library(rugarch)
library(mvtnorm)
library(copula)
library(pbdMPI)
library(parallel)
cores <- 40

# here reading the data  (reading or defining n, N, lag, q, V,ll)

source("CopulaFunctions.R")
source("MLEstimation.R")
source("RMApplication.R")

wrapper <- function(i, loop, n, lag, p, q, spec, V.ll){
  comm.set.stream(i) # set rng stream for ith loop item
  i <- loop[i] # select ith loop item
  
  # 1. Estimate rolling-window parameters
  R   <- t(coredata(V.ll[(i-n+1) : (i+lag), ]))
  Est <- GARCH.est(R, spec, lag, v=10)
  if(is.na(Est)){return(NA)}            # if estimators did not converge
  
  # 2. Produce rolling window CoVaR forecasts
  R <- t(coredata( V.ll[(i+1) : (i+lag), ] ))

  CoVaR.forecasts.t <- CoVaR.t.forecast(R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.t, lag, p, q)

  CoVaR.forecasts.Gauss <- CoVaR.Gauss.forecast( R, Est$U, Est$eps, Est$Fn.1, Est$Fn.2, Est$outp1, Est$outp2, Est$GAS.Gauss, lag, p, q)
  
  return( list(CoVaR.forecasts.t     = CoVaR.forecasts.t,
               CoVaR.forecasts.Gauss = CoVaR.forecasts.Gauss) )
}

loop    <- seq.int(from = n, to = N-lag, by = lag)    # loop over these
my_vec <- comm.chunk(length(loop), form = "vector", rng = TRUE, seed = 12345)

# GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "fGARCH", garchOrder = c(1, 1), submodel = "GARCH"),
                  mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
                  distribution.model = "norm")          # simple GARCH(1,1) specification
Forecasts.GARCH <- mclapply(my_vec, wrapper, loop=loop, n, lag=lag, p=p, q=q, spec, V.ll=V.ll, mc.cores = cores)

saveRDS(Forecasts.GARCH, file = paste0("GARCH", comm.rank(), ".rds"))

# GJR-GARCH(1,1)
spec = ugarchspec(variance.model = list(model = "gjrGARCH", garchOrder = c(1, 1)),
                  mean.model = list(armaOrder = c(0, 0), include.mean = TRUE),
                  distribution.model = "norm")          # GJR-GARCH(1,1) specification

Forecasts.GJR  <- mclapply(my_vec, wrapper, loop=loop, n, lag=lag, p=p, q=q, spec, V.ll=V.ll, mc.cores = cores)

saveRDS(Forecasts.GJR, file = paste0("GJR", comm.rank(), ".rds"))

finalize()

使用此 SLURM.sh 将其提交到 Slurm:

#!/bin/bash
#SBATCH --output=output.job9.log
#SBATCH --error=error.job9.log
#SBATCH --nodes=2
#SBATCH --exclusive
#SBATCH --time=22:30:00

module load openmpi/4.1.4/intel
module load gsl/2.7
module load R/4.2.1
export OMPI_MCA_mpi_warn_on_fork=0

time mpirun --map-by ppr:1:node Rscript code.R

这将运行

code.R
的两个副本,每个副本都在其自己的循环部分上工作。它假设您的节点具有 40 个或更多核心。使用
mclapply()
的一大优点是输入数据位于共享内存中并且不会被复制。同时,您可以使用 SLURM.sh 脚本扩展到任意数量的节点 - code.R 通过将
length(loop)
拆分为
my_vec
块进行调整。每个循环索引都有自己的 RNG,因此可以通过节点和核心之间的任何分割来重现。

请注意,我已删除您的打印语句,因为最好只返回结果。不建议从并行运行的 40 个核心进行打印。另请注意,每个节点将结果(包含

length(my_vec)
元素的列表)保存在唯一命名的
.rds
文件中。您的运行时间是通过 shell 脚本中的
time
完成的。

我没有运行这个,因为我没有你的数据。让我知道这对你来说是如何运行的。

© www.soinside.com 2019 - 2024. All rights reserved.