Aim

Outline how to parallelise multiple simulations effectively on one or more machines.

Required R packages

This exercise builds upon the earlier examples of model simulation.

The packages used can be attached with

library("dynatop")
library("terra")
library("xts")
library("parallel")

and remember to move to the eden_data directory e.g.

setwd("../eden_data")

Target Problem

The aim of this section is to address how to efficiently perform multiple independent runs of dynatop. In the code below we outline how to set up 10000 simulations with the same observed data but different parameters. Since the simulations do not interact the memory used (at least for storing the dynatop object) must be separate for each parallel simulation. This means the problem is distinct from using multiple computational processes to act on the same memory to speed up a single simulation.

Work Flow

A simple but effective workflow is

A Pattern for replacing the parameters

As shown earlier apply parameters to a HRU is effectively done with a function that apply the changes to a single HRU. In this case the function requires both a HRU description and vector or new parameters.

## h is the hru, p is a named vector of parameter values
fparChange <- function(h,p){
    ## change "m" transmissivty decay parameter depending upon atb class
    if( !is.na(h$class$atb_20) ){
        if( h$class$atb_20 < 20 ){
            h$sz$parameters["m"] <- p["m1"]
        }else{
            h$sz$parameters["m"] <- p["m2"]
        }
    }
    ## change unsaturated zone time constant regarless of type
    h$uz$parameters["t_d"] <- p["Td"]
    ## change surface velocity depending upon if it is a channel
    if( is.na(h$class$startNode) ){
        ## hillslope
        h$sf$parameters["c_sf"] <- p["csf"]
    }else{
        ## channel
        h$sf$parameters["c_sf"] <- p["vch"]
    }
    return(h)
}

## save the function
tmp <- deparse(fparChange); tmp[1] <- paste("fparChange <-",tmp[1])
writeLines(tmp,file.path(".","dyna","fparChange.R"))

Random Sample

Generate a random sample of the values used in the pattern, along with a sample identifier. This is stored in a numeric matrix. Generating all the samples at once avoids many issues of repeatability and uniqueness which may arise if they are generated separately.

## start by loading the model for reference
nsim <- 10000
sample <- cbind(
  sid = 1:nsim, ## the simulation identifier number
  Td = runif(nsim,0.1,120),
  m1 = runif(nsim,0.005,0.06),
  m2 = runif(nsim,0.005,0.06),
  csf = runif(nsim,0.05,0.5),
  vch = runif(nsim,0.2,1.4))
## save the random sample
saveRDS(sample,file.path(".","dyna","sample.rds"))

Evaluation function

The purpose of the evaluation function is to

Below is an example of such a function

## normally this function would be entered into its own file.
## Here to show it we enter it then add it to a file
## inputs are i the index of a row of sample and the model - which is altered by the function
## It expects the sample, pattern, obs and q0 to be present in the workspace fMC is called from
## it does not need its own copy of these since they are not changed
fMC <- function(i,sample,mdl,obs,q0,fP){
  x <- sample[i,] # the values to substitute in
  ## put sample values into model
  hru <- lapply( mdl$hru, fP,p=x) 
  ## put intial conditions into the model
  hru <- lapply(hru,function(h,q0){h$initialisation["r_uz_sz_0"] <- q0; h},q0=q0)
    ## simulate the model
  dt <- dynatop::dynatop$new(hru)$add_data(obs)$initialise()$sim(mdl$output_flux)
  ## get mass balance
  mb <- dt$get_mass_errors()
    if( any(abs(mb$err)>1e-3) ){ ## this is an error of greater then 1m^3
    stop("Mass Balance error in sid ",x["sid"])
  }
  ##ch_in <- dt$get_channel_inflow()
  gf <- dt$get_output()
  if(any(is.na(gf))){
    stop("non-finite flow returned")
  }
  ## ns = nash-sutcliffe
  ns <- 1 - sum((obs$Sheepmount_obs-gf$Sheepmount)^2,na.rm=TRUE) /
    sum((obs$Sheepmount_obs - mean(obs$Sheepmount_obs,na.rm=TRUE))^2,na.rm=TRUE)
  ## if good performance save some outputs to use
  if(ns>0.5){ 
    saveRDS(
      list(sid = x["sid"],
           gauge_flow=gf,
           mass_balance = mb),
      file.path(".","dyna",paste0("Simulation_",x["sid"],".rds"))
    )
  }
  return(ns)
}

## save the function
tmp <- deparse(fMC); tmp[1] <- paste("fMX <-",tmp[1])
writeLines(tmp,file.path(".","dyna","fMC.R"))

Execution

To run the simulations on a single R process an example script would be

rm(list=ls())
library(dynatop)

## source function for running
source( file.path(".","dyna","fMC.R") ) ## source the function so it is available
## load model
mdl <- readRDS(file.path(".","dyna","atb_20_model.rds"))
## load obs
obs <- readRDS(file.path(".","processed","obs.rds"))
## load the parameter function
source( file.path(".","dyna","fparChange.R"))
## load sample
sample <- readRDS(file.path(".","dyna","sample.rds"))
## load gauges
mdl$output_flux <- readRDS(file.path(".","dyna","gauge_output_defn.rds"))


q0 <- obs$Sheepmount_obs[1] / sum( sapply(mdl$hru,function(h){h$properties["area"]}) )

## this wraps the fMC function to catch any errors and stop then killing the simulation 
fout <- function(i,sample,mdl,obs,q0,fP){
    tryCatch({fMC(i,sample,mdl,obs,q0,fP)},
             error=function(e){e$message}) ## if an error return the message
}

## use lapply to iterate over the parameter sets - this will run on a single R process
#out <- lapply(seq_len(nrow(sample)), fout)

## on linux/mac use mclapply to run across multiple cores on the same machine
## alter mc.cores to match the machine being used
## out <- mclapply(1:6,fMC,sample=sample,mdl=mdl,obs=obs,q0=q0,fP=fparChange,mc.cores=6,mc.preschedule = FALSE)

## on windows/linux/mac you could use parLapply but the set up is more complex
cl <- makePSOCKcluster(6) # alter number to be number of cores on machine you want to use
out2 <- parLapply(cl,1:6,fMC,sample=sample,mdl=mdl,obs=obs,q0=q0,fP=fparChange)
stopCluster(cl)
## The `future` package offer an alternative.


## save out after naming with the simulation number
names(out) <- paste0("sid_",sample[,"sid"])
saveRDS(out,"output.rds")

The above script could be run from a shell command line using

Rscript <my_script_name>.R

many HPC job submission systems allow for the submission of \(n_j\) multiple, near identical jobs, identified as job \(j\) of \(n_j\).

The above script can be adapted for this using command line arguments to receive \(j\) and \(jn\)

## prices the command line arguments
arg <- as.integer(commandArgs(trailingOnly = TRUE))
if(!all(is.finite(arg))){ stop("Invalid command line arguments") }
j <- arg[1]
nj <- arg[2]
    

library(dynatop)
source( file.path(".","dyna","fMC.R") ) ## source the function so it is available
## load model
mdl <- readRDS(file.path(".","dyna","atb_20_model.rds"))
## load obs
obs <- readRDS(file.path(".","processed","obs.rds"))
## load the parameter function
source( file.path(".","dyna","fparChange.R"))
## load sample
sample <- readRDS(file.path(".","dyna","sample.rds"))
## load gauges
mdl$output_flux <- readRDS(file.path(".","dyna","gauge_output_defn.rds"))

q0 <- as.numeric(obs$Sheepmount_obs[1]) / sum( sapply(mdl$hru,function(h){h$properties["area"]}) )

## this wraps the fMC function to catch any errors and stop then killing the simulation 
fout <- function(i){
  tryCatch({fMC(i,sample,mdl,obs,q0,fP)},
           error=function(e){e$message}) ## if an error return the message
}

## use lapply to iterate over the parameter sets - this will run on a single R process
idx <- seq(j,nrow(sample),by=nj) ## evaluate every nj rows
out <- lapply(idx, fout)

## save out after naming with the simulation number
names(out) <- paste0("sid_",sample[idx,"sid"])
saveRDS(out,paste0("output_",j,".rds"))

This script could be run from a shell command line for job \(j=1\) of \(n_j=10\) with

Rscript <my_script_name>.R 1 10