library(foreach)
library(iterators)
Marie-Hélène Burle
January 31, 2023
To compile R packages, you need a C compiler.
In theory, you could use the proprietary Intel compiler which is loaded by default on the Alliance clusters, but it is recommended to replace it with the GCC compiler (R packages can even be compiled with Clang and LLVM, but the default GCC compiler is the best way to avoid headaches).
It is thus much simpler to always load a gcc
module before loading an r
module.
To see what versions of R are available on a cluster, run:
To see the dependencies of a particular version (e.g. r/4.2.1
), run:
StdEnv/2020
is a required module for this version.
On most Alliance clusters, it is automatically loaded, so you don’t need to include it. You can double-check with module list
or you can include it (before r/4.2.1
) just to be sure.
Finally, load your modules:
To install a package, launch the interactive R console with:
In the R console, run:
repos
argument: chose a CRAN mirror close to the location of your cluster or use https://cloud.r-project.org/.
The first time you install a package, R will ask you whether you want to create a personal library in your home directory. Answer yes
to both questions. Your packages will now install under ~/
.
Some packages require additional modules to be loaded before they can be installed. Other packages need additional R packages as dependencies. In either case, you will get explicit error messages. Adding the argument dependencies = TRUE
helps in the second case, but you will still have to add packages manually from time to time.
Let’s install the packages needed for this webinar:
install.packages(
c("tidyverse", "bench", "doFuture", "doRNG", "randomForest", "Rcpp"),
repos="https://mirror.rcg.sfu.ca/mirror/CRAN/" # closest mirror from Cedar
)
This will also install the dependencies foreach
, future
, and iterators
.
To leave the R console, press <Ctrl+D>
.
To run an R script called <your_script>.R
, you first need to write a job script:
Example:
Then launch your job with:
You can monitor your job with sq
(an alias for squeue -u $USER $@
).
While it is fine to run R on the login node when you install packages, you must start a SLURM job before any heavy computation.
To run R interactively, you should launch an salloc
session.
Here is what I will use for this webinar:
This takes me to a compute node where I can launch R to run computations:
The first thing to do if you want to improve your code efficiency is to identify bottlenecks in your code. Common tools are:
Rprof()
profvis is a newer tool, built by posit (formerly RStudio). Under the hood, it runs Rprof()
to collect data, then produces an interactive html widget with a flame graph that allows for an easy visual identification of slow sections of code. While this tool integrates well within the RStudio IDE or the RPubs ecosystem, it is not very well suited for remote work on a cluster. One option is to profile your code with small data on your own machine. Another option is to use the base profiler with Rprof()
directly as in this example.
Once you have identified expressions that are particularly slow, you can use benchmarking tools to compare variations of the code.
In the most basic fashion, you can use system.time()
, but this is limited and imprecise.
The microbenchmark package is a popular option.
It gives the minimum time, lower quartile, mean, median, upper quartile, and maximum time of R expressions.
The newer bench package has less overhead, is more accurate, and—for sequential code—gives information on memory usage and garbage collections. This is the package I will use today.
We talk about multi-threading when a single process (with its own memory) runs multiple threads.
The execution can happen in parallel—if each thread has access to a CPU core—or by alternating some of the threads on some CPU cores.
Because all threads in a process write to the same memory addresses, multi-threading can lead to race conditions.
Multi-threading does not seem to be a common approach to parallelizing R code.
Multi-processing in shared memory happens when multiple processes execute code on multiple CPU cores of a single node (or a single machine).
The different processes need to communicate with each other, but because they are all running on the CPU cores of a single node, messages can pass via shared memory.
When processes involved in the execution of some code run on multiple nodes of a cluster, messages between them need to travel over the cluster interconnect. In that case, we talk about distributed memory.
The parallel
package has been part of the “base” package group since version 2.14.0.
This means that it is comes with R.
Most parallel approaches in R build on this package.
We will make use of it to create and close an ad-hoc cluster.
The parallelly package adds functionality to the parallel
package.
The foreach package implements a looping construct without an explicit counter. It doesn’t require the preallocation of an output container, it brings to R an equivalent of the Python or Julia list comprehensions, and mostly, it allows for an easy execution of loops in parallel. Unlike loops, it creates variables (loops are used for their side-effect).
Let’s look at an example to calculate the sum of 1e4 random vectors of length 3.
We will use foreach
and iterators
(which creates convenient iterators for foreach
):
Classic while loop:
With foreach:
Verify:
The best part of foreach
is that you can turn sequential loops into parallel ones by registering a parallel backend and replacing %do%
with %dopar%
.
There are many parallelization backends available: doFuture
, doMC
, doMPI
, doFuture
, doParallel
, doRedis
, doRNG
, doSNOW
, and doAzureParallel
.
In this webinar, I will use doFuture
which allows to evaluate foreach
expressions following any of the strategies of the future
package.
So first, what is the future
package?
A future is an object that acts as an abstract representation for a value in the future. A future can be resolved (if the value has been computed) or unresolved. If the value is queried while the future is unresolved, the process is blocked until the future is resolved.
Futures allow for asynchronous and parallel evaluations. The future
package provides a simple and unified API to evaluate futures.
The future
package does this thanks to the plan
function:
plan(sequential)
: futures are evaluated sequentially in the current R sessionplan(multisession)
: futures are evaluated by new R sessions spawned in the background (multi-processing in shared memory)plan(multicore)
: futures are evaluated in processes forked from the existing process (multi-processing in shared memory)plan(cluster)
: futures are evaluated on an ad-hoc cluster (allows for distributed parallelism across multiple nodes)To ensure a consistent behaviour across plans, all evaluations are done in a local environment:
We had:
We can replace %do%
with %dopar%
:
Since we haven’t registered any parallel backend, the expression will still be evaluated sequentially.
To run this in parallel, we need to load doFuture
, register it as a backend (with registerDoFuture()
), and choose a parallel strategy (e.g. plan(multicore)
):
library(foreach)
library(doFuture)
registerDoFuture()
plan(multicore)
set.seed(2)
result3 <- foreach(icount(1e4), .combine = '+') %dopar% runif(3)
With the overhead of parallelization, it actually doesn’t make sense to parallelize such a short code, so let’s go over a toy example and do some benchmarking.
For this toy example, I will use a modified version of one of the examples in the foreach vignette: I will b uild a classification model made of a forest of decision trees thanks to the randomForest
package.
Because the code includes randomly generated numbers, I will use the doRNG
package which replaces foreach::%dopar%
wit h doRNG::%dorng%
. This follows the recommendations of Pierre L’Ecuyer (1999)1 and ensures reproducibility.
library(doFuture) # This will also load the `future` package
library(doRNG) # This will also load the `foreach` package
library(randomForest)
library(bench) # To do some benchmarking
Loading required package: foreach
Loading required package: future
Loading required package: rngtools
The goal is to create a classifier based on some data (here a matrix of random numbers for simplicity) and a response variable (as factor). This model could then be passed in the predict()
function with novel data to generate predictions of classification. But here we are only interested in the creation of the model as this is the part that is computationally intensive. We aren’t interested in actually using it.
set.seed(11)
traindata <- matrix(runif(1e5), 100)
fac <- gl(2, 50)
rf <- foreach(ntree = rep(250, 8), .combine = combine) %do%
randomForest(x = traindata, y = fac, ntree = ntree)
rf
Call:
randomForest(x = traindata, y = fac, ntree = ntree)
Type of random forest: classification
Number of trees: 2000
No. of variables tried at each split: 31
This is the non parallelizable code with %do%
:
tref <- mark(
rf1 <- foreach(ntree = rep(250, 8), .combine = combine) %do%
randomForest(x = traindata, y = fac, ntree = ntree),
memory = FALSE
)
tref$median
[1] 5.66s
This is the parallelizable foreach
code, but run sequentially:
registerDoFuture() # Set the parallel backend
plan(sequential) # Set the evaluation strategy
# Using bench::mark()
tseq <- mark(
rf2 <- foreach(ntree = rep(250, 8), .combine = combine) %dorng%
randomForest(x = traindata, y = fac, ntree = ntree),
memory = FALSE
)
tseq$median
[1] 5.78s
No surprise: those are similar.
future
provides availableCores()
to detect the number of available cores:
system
4
Similar to parallel::detectCores()
.
This detects the number of CPU cores available to me on the current compute node, that is, what I can use for shared memory multi-processing.
Shared memory multi-processing can be run with plan(multisession)
that will spawn new R sessions in the background to evaluate futures:
plan(multisession)
tms <- mark(
rf2 <- foreach(ntree = rep(250, 8), .combine = combine) %dorng%
randomForest(x = traindata, y = fac, ntree = ntree),
memory = FALSE
)
tms$median
[1] 2s
We got a speedup of 5.78 / 2 = 2.9
.
Shared memory multi-processing can also be run with plan(multicore)
(except on Windows) that will fork the current R process to evaluate futures:
plan(multicore)
tmc <- mark(
rf2 <- foreach(ntree = rep(250, 8), .combine = combine) %dorng%
randomForest(x = traindata, y = fac, ntree = ntree),
memory = FALSE
)
tmc$median
[1] 1.9s
We got a very similar speedup of 5.78 / 1.9 = 3.0
.
I requested 8 tasks from Slurm on a training cluster made of nodes with 4 CPU cores each. Let’s verify that I got them by accessing the SLURM_NTASKS
environment variable from within R:
[1] 8
I can create a character vector with the name of the node each task is running on:
chr [1:8] "node1" "node1" "node1" "node1" "node2" "node2" "node2" "node2"
This allows me to create a cluster of workers:
socket cluster with 8 nodes on hosts ‘node1’, ‘node2’
I can now try the code with distributed parallelism using all 8 CPU cores across both nodes:
plan(cluster, workers = cl)
tdis <- mark(
rf2 <- foreach(ntree = rep(250, 8), .combine = combine) %dorng%
randomForest(x = traindata, y = fac, ntree = ntree),
memory = FALSE
)
tdis$median
[1] 1.14s
Speedup: 5.78 / 1.14 = 5.1
.
The cluster of workers can be stopped with:
The multidplyr package partitions data frames across worker processes, allows you to run the usual tidyverse functions on each partition, then collects the processed data.
The furrr package is a parallel equivalent to the purrr package from the tidyverse.
If you work with genomic data, you might want to have a look at the BiocParallel package from Bioconductor.
Yet another option to run distributed R code is to use the sparklyr package (an R interface to Spark).
Rmpi is a wrapper to MPI (Message-Passing Interface). It has proved slow and problematic on Cedar though.
The boot package provides functions and datasets specifically for bootstrapping in parallel.
Rcpp provides C++ classes with mappings to R’s .Call()
. C++ functions can be declared in source files or directly in R scripts.