Partitioning data with multidplyr

Author

Marie-Hélène Burle

The package multidplyr provides simple techniques to partition data across a set of workers on the same node.

Data partitioning for memory

Case example

What if we have an even bigger dataset?

The randomForest() function has limitations:

  • It is a memory hog.
  • It doesn’t run if your data frame has too many rows.

If you try to run:

bigger.R
library(randomForest)

bigger_iris <- iris[rep(seq_len(nrow(iris)), each = 1e3), ]
rownames(bigger_iris) <- NULL

set.seed(123)
rf <- randomForest(Species ~ ., data = bigger_iris)

rf

on a single core, you will get:

randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.
/var/spool/slurmd/job00016/slurm_script: line 5: 74451 Killed                  Rscript data_partition.R
slurmstepd: error: Detected 1 oom-kill event(s) in StepId=16.batch. Some of your processes may have been killed by the cgroup out-of-memory handler.

You have ran out of memory.

Reducing the number of trees won’t help as the problem comes from the size of the data frame.

Similarly, using foreach and doFuture as we did previously won’t help either because that spreads the number of trees on various cores, but again, the problem doesn’t come from the number of trees, but for the size of the dataset.

With plan(multisession), you would get:

Cluster with multisession
Error in unserialize(node$con) :
  MultisessionFuture (doFuture2-3) failed to receive message results from cluster RichSOCKnode #3 (PID 445273 on localhost ‘localhost’). The reason reported was ‘error reading from connection’. Post-mortem diagnostic: No process exists with this PID, i.e. the localhost worker is no longer alive. The total size of the 3 globals exported is 5.15 MiB. There are three globals: ‘big_iris’ (5.15 MiB of class ‘list’), ‘...future.seeds_ii’ (160 bytes of class ‘list’) and ‘...future.x_ii’ (112 bytes of class ‘list’)

And with plan(multicore):

Cluster with multicore
Error: Failed to retrieve the result of MulticoreFuture (doFuture2-2) from the forked worker (on localhost; PID 444769). Post-mortem diagnostic: No process exists with this PID, i.e. the forked localhost worker is no longer alive. The total size of the 3 globals exported is 5.15 MiB. There are three globals: ‘big_iris’ (5.15 MiB of class ‘list’), ‘...future.seeds_ii’ (160 bytes of class ‘list’) and ‘...future.x_ii’ (112 bytes of class ‘list’)
In addition: Warning message:
In mccollect(jobs = jobs, wait = TRUE) :
  1 parallel job did not deliver a result

You can even try spreading the trees on multiple nodes, but things will fail as well, without any error message.

Of course, you could always try on a different machine—one with more memory. I used my machine which has more memory than this training cluster and it worked.

But then, what if big_iris is even bigger? Say, if we have this for instance:

bigger_iris <- iris[rep(seq_len(nrow(iris)), each = 1e4), ]

Then no amount of memory will save you and you will get errors similar to this:

Error in randomForest.default(m, y, ...) : 
  long vectors (argument 28) are not supported in .C

That’s because randomForest() does not accept datasets with too many rows.

The bottom line is that there are situation in which the data is just too big. In such cases, you want to look at data parallelism: instead of splitting your code into tasks that can run in parallel as we did previously, you split the data into chunks and run the code in parallel on those chunks.

Of course, you could also simply run the code on a subset of your data. In many situation, reducing your data by sampling it properly will be good enough. But there are situations in which you want to use a huge dataset.

You could split the data manually and run the code on each chunk, but it would be tedious and very lengthy. And to run the code on all the chunks in parallel, you could implement that yourself. There is a much simpler option provided by the multidplyr package.

Using multidplyr

To see what happens as we use multidplyr, let’s first run the code in an interactive session on one node with 4 cores:

# Launch the interactive job
salloc --time=50 --mem-per-cpu=7500M --cpus-per-task=4

# Then launch R
R

First, we load the packages that are running in the main session:

library(multidplyr)
library(dplyr, warn.conflicts = FALSE)

We load dplyr for the do() function.

Notice that we aren’t loading the randomForest package yet: that’s because we will use it on workers, not in the main session.

Then we need to create a cluster of workers. Let’s use 4 workers that we will run on a full node:

cl <- new_cluster(4)
cl
4 session cluster [....]

Now we can load the randomForest package on each worker:

cluster_library(cl, "randomForest")
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.

Attaching package: ‘randomForest’

The following object is masked from ‘package:dplyr’:

    combine

Of course, we need to generate our big dataset:

bigger_iris <- iris[rep(seq_len(nrow(iris)), each = 1e3), ]
rownames(bigger_iris) <- NULL

Then we create a partitioned data frame on the workers with the partition() function. The function will try to split the data as heavenly as possible among workers.

If you group observations by some variable (with dplyr::group_by()) beforehand, multidplyr will ensure that all data points in a group end up on the same worker. This is very convenient in a lot of cases, but is not relevant here. Without grouping observations first, it is unclear how partition() chooses which observation goes to which worker. In our data, we have all the setosa observations first, then all the versicolor, and finally all the virginica. We want to make sure that the randomForest() function runs on a sample of all 3 species. We will thus randomly shuffle the data before partitioning it (when we were parallelizing by splitting the trees, we didn’t have to worry about that since each subset of trees was running on the entire dataset):

# Shuffle the rows of the data frame randomly
set.seed(11)
bigger_iris_shuffled <- bigger_iris[sample(nrow(bigger_iris)), ]

# You can check that they are shuffled
head(bigger_iris_shuffled)
       Sepal.Length Sepal.Width Petal.Length Petal.Width    Species
65570           6.7         3.1          4.4         1.4 versicolor
19004           5.1         3.8          1.5         0.3     setosa
73612           6.1         2.8          4.7         1.2 versicolor
28886           5.2         3.4          1.4         0.2     setosa
121310          5.6         2.8          4.9         2.0  virginica
21667           5.1         3.7          1.5         0.4     setosa
# Create the partitioned data frame
split_iris <- partition(bigger_iris_shuffled, cl)
split_iris
Source: party_df [150,000 x 5]
Shards: 4 [37,500--37,500 rows]

# A data frame: 150,000 × 5
  Sepal.Length Sepal.Width Petal.Length Petal.Width Species
         <dbl>       <dbl>        <dbl>       <dbl> <fct>
1          6.7         3.1          4.4         1.4 versicolor
2          5.6         2.8          4.9         2   virginica
3          6.4         2.8          5.6         2.2 virginica
4          5.6         2.5          3.9         1.1 versicolor
5          4.7         3.2          1.6         0.2 setosa
6          6.7         3            5           1.7 versicolor
# ℹ 149,994 more rows
# ℹ Use `print(n = ...)` to see more rows

If we want the code to be reproducible, we should set the seed on each worker:

cluster_send(cl, set.seed(123))

Run cluster_send() to send code to each worker when you aren’t interested in any result (as is the case here) and cluster_call() if you want a computation to be executed on each worker and a result to be returned.

Now we can run the randomForest() function on each worker:

split_rfs <- split_iris %>%
  do(rf = randomForest(Species ~ ., data = .))

split_rfs is a partitioned data frame containing the results from each worker (the intermediate randomForest models):

split_rfs
Source: party_df [4 x 1]
Shards: 4 [1--1 rows]

# A data frame: 4 × 1
  rf
  <list>
1 <rndmFrs.>
2 <rndmFrs.>
3 <rndmFrs.>
4 <rndmFrs.>

Now we need to bring the partitioned results in the main process:

rfs <- split_rfs %>% collect()

rfs is a data frame with a single column called rf:

rfs
# A tibble: 4 × 1
  rf
  <list>
1 <rndmFrs.>
2 <rndmFrs.>
3 <rndmFrs.>
4 <rndmFrs.>

Which means that rfs$rf is a list:

typeof(rfs$rf)
[1] "list"

Each element of this list is a randomForest object (the 4 intermediate models created by the 4 workers):

rfs$rf
[[1]]

Call:
 randomForest(formula = Species ~ ., data = .)
               Type of random forest: classification
                     Number of trees: 500
No. of variables tried at each split: 2

        OOB estimate of  error rate: 0%
Confusion matrix:
           setosa versicolor virginica class.error
setosa      12500          0         0           0
versicolor      0      12500         0           0
virginica       0          0     12500           0

[[2]]

Call:
 randomForest(formula = Species ~ ., data = .)
               Type of random forest: classification
                     Number of trees: 500
No. of variables tried at each split: 2

        OOB estimate of  error rate: 0%
Confusion matrix:
           setosa versicolor virginica class.error
setosa      12500          0         0           0
versicolor      0      12500         0           0
virginica       0          0     12500           0

[[3]]

Call:
 randomForest(formula = Species ~ ., data = .)
               Type of random forest: classification
                     Number of trees: 500
No. of variables tried at each split: 2

        OOB estimate of  error rate: 0%
Confusion matrix:
           setosa versicolor virginica class.error
setosa      12500          0         0           0
versicolor      0      12500         0           0
virginica       0          0     12500           0

[[4]]

Call:
 randomForest(formula = Species ~ ., data = .)
               Type of random forest: classification
                     Number of trees: 500
No. of variables tried at each split: 2

        OOB estimate of  error rate: 0%
Confusion matrix:
           setosa versicolor virginica class.error
setosa      12500          0         0           0
versicolor      0      12500         0           0
virginica       0          0     12500           0

If you don’t need to explore the intermediate objects, you can combine the commands as:

rfs <- split_iris %>%
  do(rf = randomForest(Species ~ ., data = .)) %>%
  collect()

Finally, we need to combine the 4 randomForest models into a single one. This can be done with the combine() function from the randomForest package (the same function we already used in our foreach expressions):

rf_all <- do.call(combine, rfs$rf)

Be careful that randomForest and dplyr both have a combine() function. The one we want here is the one from the randomForest package. To avoid all conflict and confusion, you can use randomForest::combine(). combine() is ok if you make sure to load dplyr before randomForest since latest loaded functions overwrite earlier loaded ones.

Why are we using do.call()? If we use:

combine(rfs$rf)

We get the silly message:

Error in combine(rfs$rf) :
  Argument must be a list of randomForest objects

That is because randomForest::combine() expects a list of randomForest objects, but cannot accept an object of type list.

Here is our final randomForest model:

rf_all
Call:
 randomForest(formula = Species ~ ., data = .)
               Type of random forest: classification
                     Number of trees: 2000
No. of variables tried at each split: 2

This is it: by splitting our data frame on 4 cores, we could run the code and create a randomForest model using whole of the data.

We can test our model:

new_data <- data.frame(
  Sepal.Length = c(5.3, 4.6, 6.5),
  Sepal.Width = c(3.1, 3.9, 2.5),
  Petal.Length = c(1.5, 1.5, 5.0),
  Petal.Width = c(0.2, 0.1, 2.1)
)

predict(rf_all, new_data)
        1         2         3
   setosa    setosa virginica
Levels: setosa versicolor virginica

Running this in an interactive session was useful to see what happens, but the way you would actually do this is by writing a script (let’s call it partition.R):

partition.R
library(multidplyr)
library(dplyr, warn.conflicts = FALSE)

# Create cluster of workers
cl <- new_cluster(4)

# Load randomForest on each worker
cluster_library(cl, "randomForest")

# Create our big data frame
bigger_iris <- iris[rep(seq_len(nrow(iris)), each = 1e3), ]
rownames(bigger_iris) <- NULL

# Create a partitioned data frame on the workers
split_iris <- partition(bigger_iris, cl)

# Set the seed on each worker
cluster_send(cl, set.seed(123))

# Run the randomForest() function on each worker
rfs <- split_iris %>%
  do(rf = randomForest(Species ~ ., data = .)) %>%
  collect()

# Combine the randomForest models into one
rf_all <- do.call(combine, rfs$rf)

And run it with a Bash partition.sh script:

partition.sh
#!/bin/bash
#SBATCH --time=10
#SBATCH --mem-per-cpu=7500M
#SBATCH --cpus-per-task=4

Rscript partition.R

Conclusion

multidplyr allowed us to split our data frame across multiple workers on one node and this solved the memory issue we had with our large dataset.

Data partitioning for speed

Beside the memory advantage, are we getting any speedup from data parallelization? i.e. how does this code compare with the parallelization we did as regard the number of trees with foreach and doFuture?

We want to make sure to compare the same things. So we go back to our smaller big_iris and we up the number of trees back to 2000.

We will compare it with the plans multisession and multicore that we performed earlier. The minimum and median times for these two options for shared memory parallelism were of 2.72s and 3.15s respectively.

partition_bench.R
library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
library(bench)

cl <- new_cluster(4)
cluster_library(cl, "randomForest")

big_iris <- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
rownames(big_iris) <- NULL

cluster_send(cl, set.seed(123))

part_rf <- function(data, cluster) {
  split_data <- partition(data, cluster)
  rfs <- split_data %>%
    do(rf = randomForest(Species ~ ., data = ., ntree = 2000)) %>%
    collect()
  do.call(combine, rfs$rf)
}

mark(rf_all <- part_rf(big_iris, cl))
partition_bench.sh
#!/bin/bash
#SBATCH --time=10
#SBATCH --mem-per-cpu=7500M
#SBATCH --cpus-per-task=4

Rscript partition_bench.R
sbatch partition_bench.sh
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.

Attaching package: ‘randomForest’

The following object is masked from ‘package:dplyr’:

    combine

# A tibble: 1 × 13
  expression      min median `itr/sec` mem_alloc `gc/sec` n_itr  n_gc total_time
  <bch:expr>    <bch> <bch:>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm>
1 rf_all <- pa… 2.48s  2.48s     0.403        NA     2.02     1     5      2.48s
# ℹ 4 more variables: result <list>, memory <list>, time <list>, gc <list>
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.

What about distributed memory?

Can multidplyr run in distributed memory? There is nothing on this in the documentation, so I tried it.

I upped the number of workers to 8 and ran the code on 2 nodes with 4 cores per node and got no speedup. I also created a dataset 10 times bigger (with each = 1e4), which creates an OOM on 4 cores one a single node and tried it on 11 nodes with 4 cores (10 to match the 10 times size increase, plus one to play safe). This didn’t solve the OOM issue. I tried various other tests, all with no success.

In conclusion, it seems that multidply’s way of creating a cluster of workers doesn’t have a mechanism to spread them across nodes and that the package thus does not allow to split data across nodes.

In cases where your data is so big that it doesn’t fit in the memory of a single node, it doesn’t seem that any R package currently allow to split the data automatically for you.

Conclusion

As we could see, we got similar results: in this case, it is the same to spread the number of trees running on the full data on 4 cores (as we did with foreach and doFuture or to run all the trees on the data spread on 4 cores.

The difference being that foreach and doFuture allowed us to spread the trees across nodes while multidplyr does not allow this for the data.

Direct data loading

The method we used is very convenient, but it involves copying the data to the workers. If you want to save some memory, you can load the split data directly to the workers.

For this, first, split your data into several files and have all those files (and only those files) in a directory.

Then, you can run:

library(multidplyr)
library(dplyr)
library(vroom)

# Create the cluster of workers
cl <- new_cluster(4)

# Create a character vector with the list of data files
files <- dir("/path/to/data/directory", full.names = TRUE)

# Split up the vector amongst the workers
cluster_assign_partition(cl, files = files)

# Create a data frame called split_iris on each worker
cluster_send(cl, split_iris <- vroom(files))

# Create the partitioned data frame from the workers' data frames
split_iris <- party_df(cl, "split_iris")

From here on, you can work as we did earlier.