Outline how to parallelise multiple simulations effectively on one or more machines.
This exercise builds upon the earlier examples of model simulation.
The packages used can be attached with
library("dynatop")
library("terra")
library("xts")
library("parallel")
and remember to move to the eden_data directory e.g.
setwd("../eden_data")
The aim of this section is to address how to efficiently perform
multiple independent runs of dynatop
. In the code below we
outline how to set up 10000 simulations with the same observed data but
different parameters. Since the simulations do not interact the memory
used (at least for storing the dynatop object) must be separate for each
parallel simulation. This means the problem is distinct from using
multiple computational processes to act on the same memory to speed up a
single simulation.
A simple but effective workflow is
As shown earlier apply parameters to a HRU is effectively done with a function that apply the changes to a single HRU. In this case the function requires both a HRU description and vector or new parameters.
## h is the hru, p is a named vector of parameter values
fparChange <- function(h,p){
## change "m" transmissivty decay parameter depending upon atb class
if( !is.na(h$class$atb_20) ){
if( h$class$atb_20 < 20 ){
h$sz$parameters["m"] <- p["m1"]
}else{
h$sz$parameters["m"] <- p["m2"]
}
}
## change unsaturated zone time constant regarless of type
h$uz$parameters["t_d"] <- p["Td"]
## change surface velocity depending upon if it is a channel
if( is.na(h$class$startNode) ){
## hillslope
h$sf$parameters["c_sf"] <- p["csf"]
}else{
## channel
h$sf$parameters["c_sf"] <- p["vch"]
}
return(h)
}
## save the function
tmp <- deparse(fparChange); tmp[1] <- paste("fparChange <-",tmp[1])
writeLines(tmp,file.path(".","dyna","fparChange.R"))
Generate a random sample of the values used in the pattern, along with a sample identifier. This is stored in a numeric matrix. Generating all the samples at once avoids many issues of repeatability and uniqueness which may arise if they are generated separately.
## start by loading the model for reference
nsim <- 10000
sample <- cbind(
sid = 1:nsim, ## the simulation identifier number
Td = runif(nsim,0.1,120),
m1 = runif(nsim,0.005,0.06),
m2 = runif(nsim,0.005,0.06),
csf = runif(nsim,0.05,0.5),
vch = runif(nsim,0.2,1.4))
## save the random sample
saveRDS(sample,file.path(".","dyna","sample.rds"))
The purpose of the evaluation function is to
Below is an example of such a function
## normally this function would be entered into its own file.
## Here to show it we enter it then add it to a file
## inputs are i the index of a row of sample and the model - which is altered by the function
## It expects the sample, pattern, obs and q0 to be present in the workspace fMC is called from
## it does not need its own copy of these since they are not changed
fMC <- function(i,sample,mdl,obs,q0,fP){
x <- sample[i,] # the values to substitute in
## put sample values into model
hru <- lapply( mdl$hru, fP,p=x)
## put intial conditions into the model
hru <- lapply(hru,function(h,q0){h$initialisation["r_uz_sz_0"] <- q0; h},q0=q0)
## simulate the model
dt <- dynatop::dynatop$new(hru)$add_data(obs)$initialise()$sim(mdl$output_flux)
## get mass balance
mb <- dt$get_mass_errors()
if( any(abs(mb$err)>1e-3) ){ ## this is an error of greater then 1m^3
stop("Mass Balance error in sid ",x["sid"])
}
##ch_in <- dt$get_channel_inflow()
gf <- dt$get_output()
if(any(is.na(gf))){
stop("non-finite flow returned")
}
## ns = nash-sutcliffe
ns <- 1 - sum((obs$Sheepmount_obs-gf$Sheepmount)^2,na.rm=TRUE) /
sum((obs$Sheepmount_obs - mean(obs$Sheepmount_obs,na.rm=TRUE))^2,na.rm=TRUE)
## if good performance save some outputs to use
if(ns>0.5){
saveRDS(
list(sid = x["sid"],
gauge_flow=gf,
mass_balance = mb),
file.path(".","dyna",paste0("Simulation_",x["sid"],".rds"))
)
}
return(ns)
}
## save the function
tmp <- deparse(fMC); tmp[1] <- paste("fMX <-",tmp[1])
writeLines(tmp,file.path(".","dyna","fMC.R"))
To run the simulations on a single R process an example script would be
rm(list=ls())
library(dynatop)
## source function for running
source( file.path(".","dyna","fMC.R") ) ## source the function so it is available
## load model
mdl <- readRDS(file.path(".","dyna","atb_20_model.rds"))
## load obs
obs <- readRDS(file.path(".","processed","obs.rds"))
## load the parameter function
source( file.path(".","dyna","fparChange.R"))
## load sample
sample <- readRDS(file.path(".","dyna","sample.rds"))
## load gauges
mdl$output_flux <- readRDS(file.path(".","dyna","gauge_output_defn.rds"))
q0 <- obs$Sheepmount_obs[1] / sum( sapply(mdl$hru,function(h){h$properties["area"]}) )
## this wraps the fMC function to catch any errors and stop then killing the simulation
fout <- function(i,sample,mdl,obs,q0,fP){
tryCatch({fMC(i,sample,mdl,obs,q0,fP)},
error=function(e){e$message}) ## if an error return the message
}
## use lapply to iterate over the parameter sets - this will run on a single R process
#out <- lapply(seq_len(nrow(sample)), fout)
## on linux/mac use mclapply to run across multiple cores on the same machine
## alter mc.cores to match the machine being used
## out <- mclapply(1:6,fMC,sample=sample,mdl=mdl,obs=obs,q0=q0,fP=fparChange,mc.cores=6,mc.preschedule = FALSE)
## on windows/linux/mac you could use parLapply but the set up is more complex
cl <- makePSOCKcluster(6) # alter number to be number of cores on machine you want to use
out2 <- parLapply(cl,1:6,fMC,sample=sample,mdl=mdl,obs=obs,q0=q0,fP=fparChange)
stopCluster(cl)
## The `future` package offer an alternative.
## save out after naming with the simulation number
names(out) <- paste0("sid_",sample[,"sid"])
saveRDS(out,"output.rds")
The above script could be run from a shell command line using
Rscript <my_script_name>.R
many HPC job submission systems allow for the submission of \(n_j\) multiple, near identical jobs, identified as job \(j\) of \(n_j\).
The above script can be adapted for this using command line arguments to receive \(j\) and \(jn\)
## prices the command line arguments
arg <- as.integer(commandArgs(trailingOnly = TRUE))
if(!all(is.finite(arg))){ stop("Invalid command line arguments") }
j <- arg[1]
nj <- arg[2]
library(dynatop)
source( file.path(".","dyna","fMC.R") ) ## source the function so it is available
## load model
mdl <- readRDS(file.path(".","dyna","atb_20_model.rds"))
## load obs
obs <- readRDS(file.path(".","processed","obs.rds"))
## load the parameter function
source( file.path(".","dyna","fparChange.R"))
## load sample
sample <- readRDS(file.path(".","dyna","sample.rds"))
## load gauges
mdl$output_flux <- readRDS(file.path(".","dyna","gauge_output_defn.rds"))
q0 <- as.numeric(obs$Sheepmount_obs[1]) / sum( sapply(mdl$hru,function(h){h$properties["area"]}) )
## this wraps the fMC function to catch any errors and stop then killing the simulation
fout <- function(i){
tryCatch({fMC(i,sample,mdl,obs,q0,fP)},
error=function(e){e$message}) ## if an error return the message
}
## use lapply to iterate over the parameter sets - this will run on a single R process
idx <- seq(j,nrow(sample),by=nj) ## evaluate every nj rows
out <- lapply(idx, fout)
## save out after naming with the simulation number
names(out) <- paste0("sid_",sample[idx,"sid"])
saveRDS(out,paste0("output_",j,".rds"))
This script could be run from a shell command line for job \(j=1\) of \(n_j=10\) with
Rscript <my_script_name>.R 1 10