foreach and doFuture

Author

Marie-Hélène Burle

One of the options to parallelize code with the future package is to use foreach with doFuture. In this section, we will go over an example with the random forest algorithm.

Our example code: random forest

On the iris dataset

Random forest is a commonly used ensemble learning technique for classification and regression. The idea is to combine the results from many decision trees on bootstrap samples of the dataset to improve the predictive accuracy and control over-fitting. The algorithm used was developed by Tin Kam Ho, then improved by Leo Breiman and Adele Cutler. An implementation in R is provided by the randomForest() function from the randomForest package. Let’s use it to classify the iris dataset that comes packaged with R.

First, let’s have a look at the dataset:

# Structure of the dataset
str(iris)
'data.frame':   150 obs. of  5 variables:
 $ Sepal.Length: num  5.1 4.9 4.7 4.6 5 5.4 4.6 5 4.4 4.9 ...
 $ Sepal.Width : num  3.5 3 3.2 3.1 3.6 3.9 3.4 3.4 2.9 3.1 ...
 $ Petal.Length: num  1.4 1.4 1.3 1.5 1.4 1.7 1.4 1.5 1.4 1.5 ...
 $ Petal.Width : num  0.2 0.2 0.2 0.2 0.2 0.4 0.3 0.2 0.2 0.1 ...
 $ Species     : Factor w/ 3 levels "setosa","versicolor",..: 1 1 1 1 1 1 1 1 1 1 ...
# Dimensions of the dataset
dim(iris)
[1] 150   5
# First 6 data points
head(iris)
  Sepal.Length Sepal.Width Petal.Length Petal.Width Species
1          5.1         3.5          1.4         0.2  setosa
2          4.9         3.0          1.4         0.2  setosa
3          4.7         3.2          1.3         0.2  setosa
4          4.6         3.1          1.5         0.2  setosa
5          5.0         3.6          1.4         0.2  setosa
6          5.4         3.9          1.7         0.4  setosa
# The 3 species (3 levels of the factor)
levels(iris$Species)
[1] "setosa"     "versicolor" "virginica" 

The goal is to create a random forest model (let’s call it rf) that can classify an iris flower in one of the 3 species based on the 4 measurements of its sepals and petals.

library(randomForest)
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.
set.seed(123)
rf <- randomForest(Species ~ ., data=iris)

Our response variable (Species) is a factor, so classification is assumed.

The . on the right side of the formula represents all other variables (so we are using all variables, except for the response variable Species of course, as feature variables).

rf

Call:
 randomForest(formula = Species ~ ., data = iris) 
               Type of random forest: classification
                     Number of trees: 500
No. of variables tried at each split: 2

        OOB estimate of  error rate: 4.67%
Confusion matrix:
           setosa versicolor virginica class.error
setosa         50          0         0        0.00
versicolor      0         47         3        0.06
virginica       0          4        46        0.08

As can be seen by the confusion matrix, our model performs well.

We can use it on new data to make predictions. Let’s try with some made-up data:

new_data <- data.frame(
  Sepal.Length = c(5.3, 4.6, 6.5),
  Sepal.Width = c(3.1, 3.9, 2.5),
  Petal.Length = c(1.5, 1.5, 5.0),
  Petal.Width = c(0.2, 0.1, 2.1)
)

new_data
  Sepal.Length Sepal.Width Petal.Length Petal.Width
1          5.3         3.1          1.5         0.2
2          4.6         3.9          1.5         0.1
3          6.5         2.5          5.0         2.1
predict(rf, new_data)
        1         2         3 
   setosa    setosa virginica 
Levels: setosa versicolor virginica

Let’s make it big

Now, the iris dataset only has 150 observations and we used the default number of trees (500) of the randomForest() function, so things ran fast. Often, random forests are run on large datasets. Let’s artificially increase the iris dataset and use more trees to create a situation in which parallelization would make sense.

One easy way is to replicate each row 100 times (and we can then delete the row names that get created by this operation):

big_iris <- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
rownames(big_iris) <- NULL
dim(big_iris)
[1] 15000     5

And then we can run randomForest() on this dataset and 2000 trees.

Hidden parallelism check

Before parallelizing your code, remember to check whether the package you are using is already doing any parallelization under the hood (after all, maybe the randomForest package runs things in parallel. We don’t know).

One way to do this is to test the package on your local machine and, while some sample code is running, to open htop and see how many cores are used.

Why do this on your local machine? because on the cluster, if you launch htop while your batch job is running, you will be looking at processes running on the login node while your code is running on compute node(s). So this will not help you. You could salloc on the/one of the compute node(s) running your job and run htop there, but in production clusters, compute nodes are large and you will see all the processes from all the other users using that compute node. So this test is just easier done locally.

On my machine I ran:

library(randomForest)

big_iris <- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
rownames(big_iris) <- NULL

set.seed(123)
rf <- randomForest(Species ~ ., data=big_iris, ntree=2000)

And I could confirm that the function does not run in parallel.

So let’s parallelize this code.

The foreach package

The foreach package provides a construct for repeated executions, i.e. it can replace for loops, while loops, repeat loops, and functional programming code written with the *apply functions or the purrr package. The foreach vignette gives many examples.

The doFuture package

The most useful part of foreach is that it allows for easily parallelization with countless backends: doFuture, doMC, doMPI, doParallel, doRedis, doRNG, doSNOW, and doAzureParallel.

The doFuture package is the most modern of these backends. It allows to evaluate foreach expressions across the evaluation strategies of the future package very easily. All you have to do is to register it as a backend, declare the evaluation strategy of futures of your choice, make sure to generate parallel-safe random numbers for reproducibility (if your code uses randomness), and replace %do% with %dofuture%.

Benchmarks

We will run and benchmark all versions of our code by submitting batch jobs to Slurm.

Initial code

First, let’s benchmark the initial (non parallel, not using foreach) code. We need to create an R script. Let’s call it reference.R (I will use Emacs, but you can use the nano text editor with the command nano to write the script if you want):

reference.R
library(randomForest)
library(bench)

big_iris <- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
rownames(big_iris) <- NULL

cat("\nBenchmarking results:\n\n")

set.seed(123)
mark(rf <- randomForest(Species ~ ., data=big_iris, ntree=2000))

Then we need to create a Bash script for Slurm. Let’s call it reference.sh:

reference.sh
#!/bin/bash
#SBATCH --time=5
#SBATCH --mem-per-cpu=7500M

Rscript reference.R

You can see the full list of sbatch options here.

And now we submit the job with:

sbatch reference.sh

You can monitor your job with sq. The result will be written to a file called slurm-xx.out with xx being the number of the job that just ran. To see the result, we can simply print the content of that file to screen with cat (you can run ls to see the list of files in the current directory). Make sure that your job has finished running before printing the result (otherwise you might get a partial output which can be confusing).

cat slurm-xx.out    # Replace xx by the job number
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.

Benchmarking results:

# A tibble: 1 × 13
  expression      min median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc total_time
  <bch:expr>    <bch> <bch:>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm>
1 rf <- random… 6.33s  6.33s     0.158        NA    0.474     1     3      6.33s
# ℹ 4 more variables: result <list>, memory <list>, time <list>, gc <list>
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.

foreach expression

Now, let’s try the foreach version:

foreach.R
library(foreach)
library(randomForest)
library(bench)

big_iris <- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
rownames(big_iris) <- NULL

cat("\nBenchmarking results:\n\n")

set.seed(123)
mark(
  rf <- foreach(ntree = rep(250, 8), .combine = combine) %do%
    randomForest(Species ~ ., data=big_iris, ntree=ntree)
)
foreach.sh
#!/bin/bash
#SBATCH --time=5
#SBATCH --mem-per-cpu=7500M

Rscript foreach.R
sbatch foreach.sh
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.

Benchmarking results:

# A tibble: 1 × 13
  expression      min median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc total_time
  <bch:expr>    <bch> <bch:>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm>
1 rf <- foreac… 7.04s  7.04s     0.142        NA     4.55     1    32      7.04s
# ℹ 4 more variables: result <list>, memory <list>, time <list>, gc <list>
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.

The foreach expression is slower than the standard expression (it is always the case: foreach slows things down before this overhead gets offset by parallelization).

Plan sequential

You might wonder why the sequential evaluation strategy exists (i.e. why go through all the trouble of writing your code with foreach and doFuture to then run it without parallelism?).

There are many reasons:

  • It can be very useful for debugging.
  • It makes it easy to switch the futures execution strategy back and forth for different sections of the code (maybe you don’t want to run everything in parallel).
  • It allows other people to run the same code on their different hardware without changing it (if they don’t have the resources to run things in parallel, they only have to change the execution strategy).

To turn the code into a parallelizable version with doFuture, we replace %do% with %dofuture%.

Here, we also need to use the option .options.future = list(seed = TRUE): whenever your parallel code rely on a random process, it isn’t enough to use set.seed() to ensure reproducibility, you also need to generate parallel-safe random numbers. In random forest, each tree is trained on a random subset of the data and random variables are selected for splitting at each node. The option .options.future = list(seed = TRUE) pregenerates the random seeds using L’Ecuyer-CMRG RNG streams1.

This is the parallelizable foreach code, but run sequentially:

sequential.R
library(doFuture)    # Also loads foreach and future
library(randomForest)
library(bench)

registerDoFuture()   # Set the parallel backend
plan(sequential)     # Set the evaluation strategy

big_iris <- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
rownames(big_iris) <- NULL

cat("\nBenchmarking results:\n\n")

set.seed(123)
mark(
  rf <- foreach(
    ntree = rep(250, 8),
    .options.future = list(seed = TRUE),
    .combine = combine
  ) %dofuture%
    randomForest(Species ~ ., data=big_iris, ntree=ntree)
)
sequential.sh
#!/bin/bash
#SBATCH --time=5
#SBATCH --mem-per-cpu=7500M

Rscript sequential.R
sbatch sequential.sh
Loading required package: foreach
Loading required package: future
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.

Benchmarking results:

# A tibble: 1 × 13
  expression      min median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc total_time
  <bch:expr>    <bch> <bch:>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm>
1 rf <- foreac… 8.39s  8.39s     0.119        NA     3.81     1    32      8.39s
# ℹ 4 more variables: result <list>, memory <list>, time <list>, gc <list>
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.

Each time we add unnecessary complexity in the code, things run a little slower.

Multi-processing in shared memory

Now, it is time to parallelize. First, we will use multiple cores on a single node (shared-memory parallelism).

Number of cores

The future package provides the availableCores() function to detect the number of available cores. We will run it as part of our script as a check on our available hardware.

The cluster for this course is made of 20 nodes with 4 CPUs each. We want to test shared memory parallelism, so our job needs to stay within one node. We can thus ask for a maximum of 4 CPUs and we want to ensure that we aren’t getting them on different nodes. Let’s go with that maximum of 4 cores.

Multisession

Shared memory multi-processing can be run with plan(multisession) that will spawn new R sessions in the background to evaluate futures.

multisession.R
library(doFuture)
library(randomForest)
library(bench)

# Check number of cores:
cat("\nWe have", availableCores(), "cores.\n\n")

registerDoFuture()   # Set the parallel backend
plan(multisession)   # Set the evaluation strategy

big_iris <- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
rownames(big_iris) <- NULL

cat("\nBenchmarking results:\n\n")

set.seed(123)
mark(
  rf <- foreach(
    ntree = rep(250, 8),
    .options.future = list(seed = TRUE),
    .combine = combine
  ) %dofuture%
    randomForest(Species ~ ., data=big_iris, ntree=ntree)
)
multisession.sh
#!/bin/bash
#SBATCH --time=5
#SBATCH --mem-per-cpu=7500M
#SBATCH --cpus-per-task=4

Rscript multisession.R
sbatch multisession.sh
Loading required package: foreach
Loading required package: future
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.

We have 4 cores.

Benchmarking results:

# A tibble: 1 × 13
  expression      min median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc total_time
  <bch:expr>    <bch> <bch:>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm>
1 rf <- foreac… 2.72s  2.72s     0.368        NA     2.21     1     6      2.72s
# ℹ 4 more variables: result <list>, memory <list>, time <list>, gc <list>
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.

Speedup: 3.1.

Not too bad, considering that the ideal speedup, without any overhead, would be 4.

#SBATCH --nodes=1
#SBATCH --ntasks-per-node=4

could be used instead of:

#SBATCH --cpus-per-task=4

What matters is to have 4 cores running on the same node to be in a shared memory parallelism scenario.

Multicore

Shared memory multi-processing can also be run with plan(multicore) (except on Windows) that will fork the current R process to evaluate futures.

multicore.R
library(doFuture)
library(randomForest)
library(bench)

# Check number of cores:
cat("\nWe have", availableCores(), "cores.\n\n")

registerDoFuture()   # Set the parallel backend
plan(multicore)      # Set the evaluation strategy

big_iris <- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
rownames(big_iris) <- NULL

cat("\nBenchmarking results:\n\n")

set.seed(123)
mark(
  rf <- foreach(
    ntree = rep(250, 8),
    .options.future = list(seed = TRUE),
    .combine = combine
  ) %dofuture%
    randomForest(Species ~ ., data=big_iris, ntree=ntree)
)
multicore.sh
#!/bin/bash
#SBATCH --time=5
#SBATCH --mem-per-cpu=7500M
#SBATCH --cpus-per-task=4

Rscript multicore.R
sbatch multicore.sh
Loading required package: foreach
Loading required package: future
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.

We have 4 cores.

Benchmarking results:

# A tibble: 1 × 13
  expression      min median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc total_time
  <bch:expr>    <bch> <bch:>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm>
1 rf <- foreac… 3.15s  3.15s     0.318        NA     13.7     1    43      3.15s
# ℹ 4 more variables: result <list>, memory <list>, time <list>, gc <list>
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.

Speedup: 2.7.

While in theory we should get a similar speedup, we are getting a lower one here.

Multi-processing in distributed memory

Let’s run our distributed parallel code using 8 cores across 2 nodes.

We need to create a cluster of workers. We do this by creating a character vector with the names of the nodes our tasks are running on and passing it to the makeCluster() function from the parallel package (included in R):

# Create a character vector with the nodes names
hosts <- system("srun hostname -s", intern = T)

# Create the cluster of workers
cl <- parallel::makeCluster(hosts)

We can verify that we did get 8 tasks by accessing the SLURM_NTASKS environment variable from within R:

as.numeric(Sys.getenv("SLURM_NTASKS"))

Here is the R script:

distributed.R
library(doFuture)
library(randomForest)
library(bench)

# Check number of tasks:
cat("\nWe have", as.numeric(Sys.getenv("SLURM_NTASKS")), "tasks.\n\n")

# Create a character vector with the nodes names
hosts <- system("srun hostname -s", intern = T)

# Look at the location of our tasks:
cat("\nOur tasks are running on the following nodes: ", hosts)

# Create the cluster of workers
cl <- parallel::makeCluster(hosts)

registerDoFuture()           # Set the parallel backend
plan(cluster, workers = cl)  # Set the evaluation strategy

big_iris <- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
rownames(big_iris) <- NULL

cat("\nBenchmarking results:\n\n")

set.seed(123)
mark(
  rf <- foreach(
    ntree = rep(250, 8),
    .options.future = list(seed = TRUE),
    .combine = combine
  ) %dofuture%
    randomForest(Species ~ ., data=big_iris, ntree=ntree)
)

The cluster of workers can be stopped with:

parallel::stopCluster(cl)

Here, this is not necessary since our job stops running as soon as the execution is complete, but in other systems, this will prevent you from monopolizing hardware or paying unnecessarily.

And now we need to ask Slurm for 8 tasks on 2 nodes:

distributed.sh
#!/bin/bash
#SBATCH --time=5
#SBATCH --mem-per-cpu=7500M
#SBATCH --ntasks-per-node=4
#SBATCH --nodes=2

Rscript distributed.R
sbatch distributed.sh
Loading required package: foreach
Loading required package: future
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.

We have 8 tasks.

Our tasks are running on the following nodes: "node1" "node1" "node1" "node1" "node2" "node2" "node2" "node2"

Benchmarking results:

# A tibble: 1 × 13
  expression      min median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc total_time
  <bch:expr>    <bch> <bch:>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm>
1 rf <- foreac…  1.6s   1.6s     0.624        NA     3.12     1     5       1.6s
# ℹ 4 more variables: result <list>, memory <list>, time <list>, gc <list>
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.

Speedup: 5.2.

The overhead is larger in distributed parallelism due to message passing between nodes. We are further from the ideal speedup of 8, but we still got a speedup larger than what we could have obtained with shared-memory parallelism.

#SBATCH --ntasks=8

could be used instead of:

#SBATCH --ntasks-per-node=4
#SBATCH --nodes=2

However the latter is slightly better because it allows us to use 2 full nodes instead of having tasks running on any number of nodes. However, it also means that we might have to wait longer for our job to run as it is more restrictive.