library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
Partitioning data with multidplyr
The package multidplyr provides simple techniques to partition data across a set of workers on the same node.
Data partitioning for memory
Case example
What if we have an even bigger dataset?
The randomForest()
function has limitations:
- It is a memory hog.
- It doesn’t run if your data frame has too many rows.
If you try to run:
bigger.R
library(randomForest)
<- iris[rep(seq_len(nrow(iris)), each = 1e3), ]
bigger_iris rownames(bigger_iris) <- NULL
set.seed(123)
<- randomForest(Species ~ ., data = bigger_iris)
rf
rf
on a single core, you will get:
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.
/var/spool/slurmd/job00016/slurm_script: line 5: 74451 Killed Rscript data_partition.R
slurmstepd: error: Detected 1 oom-kill event(s) in StepId=16.batch. Some of your processes may have been killed by the cgroup out-of-memory handler.
You have ran out of memory.
Reducing the number of trees won’t help as the problem comes from the size of the data frame.
Similarly, using foreach
and doFuture
as we did previously won’t help either because that spreads the number of trees on various cores, but again, the problem doesn’t come from the number of trees, but for the size of the dataset.
With plan(multisession)
, you would get:
Cluster with multisession
Error in unserialize(node$con) :
MultisessionFuture (doFuture2-3) failed to receive message results from cluster RichSOCKnode #3 (PID 445273 on localhost ‘localhost’). The reason reported was ‘error reading from connection’. Post-mortem diagnostic: No process exists with this PID, i.e. the localhost worker is no longer alive. The total size of the 3 globals exported is 5.15 MiB. There are three globals: ‘big_iris’ (5.15 MiB of class ‘list’), ‘...future.seeds_ii’ (160 bytes of class ‘list’) and ‘...future.x_ii’ (112 bytes of class ‘list’)
And with plan(multicore)
:
Cluster with multicore
Error: Failed to retrieve the result of MulticoreFuture (doFuture2-2) from the forked worker (on localhost; PID 444769). Post-mortem diagnostic: No process exists with this PID, i.e. the forked localhost worker is no longer alive. The total size of the 3 globals exported is 5.15 MiB. There are three globals: ‘big_iris’ (5.15 MiB of class ‘list’), ‘...future.seeds_ii’ (160 bytes of class ‘list’) and ‘...future.x_ii’ (112 bytes of class ‘list’)
In addition: Warning message:
In mccollect(jobs = jobs, wait = TRUE) :
1 parallel job did not deliver a result
You can even try spreading the trees on multiple nodes, but things will fail as well, without any error message.
Of course, you could always try on a different machine—one with more memory. I used my machine which has more memory than this training cluster and it worked.
But then, what if big_iris
is even bigger? Say, if we have this for instance:
<- iris[rep(seq_len(nrow(iris)), each = 1e4), ] bigger_iris
Then no amount of memory will save you and you will get errors similar to this:
Error in randomForest.default(m, y, ...) :
long vectors (argument 28) are not supported in .C
That’s because randomForest()
does not accept datasets with too many rows.
The bottom line is that there are situation in which the data is just too big. In such cases, you want to look at data parallelism: instead of splitting your code into tasks that can run in parallel as we did previously, you split the data into chunks and run the code in parallel on those chunks.
Of course, you could also simply run the code on a subset of your data. In many situation, reducing your data by sampling it properly will be good enough. But there are situations in which you want to use a huge dataset.
You could split the data manually and run the code on each chunk, but it would be tedious and very lengthy. And to run the code on all the chunks in parallel, you could implement that yourself. There is a much simpler option provided by the multidplyr package.
Using multidplyr
To see what happens as we use multidplyr
, let’s first run the code in an interactive session on one node with 4 cores:
# Launch the interactive job
salloc --time=50 --mem-per-cpu=7500M --cpus-per-task=4
# Then launch R
R
First, we load the packages that are running in the main session:
We load dplyr
for the do()
function.
Notice that we aren’t loading the randomForest
package yet: that’s because we will use it on workers, not in the main session.
Then we need to create a cluster of workers. Let’s use 4 workers that we will run on a full node:
<- new_cluster(4)
cl cl
4 session cluster [....]
Now we can load the randomForest
package on each worker:
cluster_library(cl, "randomForest")
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.
Attaching package: ‘randomForest’
The following object is masked from ‘package:dplyr’:
combine
Of course, we need to generate our big dataset:
<- iris[rep(seq_len(nrow(iris)), each = 1e3), ]
bigger_iris rownames(bigger_iris) <- NULL
Then we create a partitioned data frame on the workers with the partition()
function. The function will try to split the data as heavenly as possible among workers.
If you group observations by some variable (with dplyr::group_by()
) beforehand, multidplyr
will ensure that all data points in a group end up on the same worker. This is very convenient in a lot of cases, but is not relevant here. Without grouping observations first, it is unclear how partition()
chooses which observation goes to which worker. In our data, we have all the setosa
observations first, then all the versicolor
, and finally all the virginica
. We want to make sure that the randomForest()
function runs on a sample of all 3 species. We will thus randomly shuffle the data before partitioning it (when we were parallelizing by splitting the trees, we didn’t have to worry about that since each subset of trees was running on the entire dataset):
# Shuffle the rows of the data frame randomly
set.seed(11)
<- bigger_iris[sample(nrow(bigger_iris)), ]
bigger_iris_shuffled
# You can check that they are shuffled
head(bigger_iris_shuffled)
Sepal.Length Sepal.Width Petal.Length Petal.Width Species
65570 6.7 3.1 4.4 1.4 versicolor
19004 5.1 3.8 1.5 0.3 setosa
73612 6.1 2.8 4.7 1.2 versicolor
28886 5.2 3.4 1.4 0.2 setosa
121310 5.6 2.8 4.9 2.0 virginica
21667 5.1 3.7 1.5 0.4 setosa
# Create the partitioned data frame
<- partition(bigger_iris_shuffled, cl)
split_iris split_iris
Source: party_df [150,000 x 5]
Shards: 4 [37,500--37,500 rows]
# A data frame: 150,000 × 5
Sepal.Length Sepal.Width Petal.Length Petal.Width Species
<dbl> <dbl> <dbl> <dbl> <fct>
1 6.7 3.1 4.4 1.4 versicolor
2 5.6 2.8 4.9 2 virginica
3 6.4 2.8 5.6 2.2 virginica
4 5.6 2.5 3.9 1.1 versicolor
5 4.7 3.2 1.6 0.2 setosa
6 6.7 3 5 1.7 versicolor
# ℹ 149,994 more rows
# ℹ Use `print(n = ...)` to see more rows
If we want the code to be reproducible, we should set the seed on each worker:
cluster_send(cl, set.seed(123))
Run cluster_send()
to send code to each worker when you aren’t interested in any result (as is the case here) and cluster_call()
if you want a computation to be executed on each worker and a result to be returned.
Now we can run the randomForest()
function on each worker:
<- split_iris %>%
split_rfs do(rf = randomForest(Species ~ ., data = .))
split_rfs
is a partitioned data frame containing the results from each worker (the intermediate randomForest models):
split_rfs
Source: party_df [4 x 1]
Shards: 4 [1--1 rows]
# A data frame: 4 × 1
rf
<list>
1 <rndmFrs.>
2 <rndmFrs.>
3 <rndmFrs.>
4 <rndmFrs.>
Now we need to bring the partitioned results in the main process:
<- split_rfs %>% collect() rfs
rfs
is a data frame with a single column called rf
:
rfs
# A tibble: 4 × 1
rf
<list>
1 <rndmFrs.>
2 <rndmFrs.>
3 <rndmFrs.>
4 <rndmFrs.>
Which means that rfs$rf
is a list:
typeof(rfs$rf)
[1] "list"
Each element of this list is a randomForest object (the 4 intermediate models created by the 4 workers):
$rf rfs
[[1]]
Call:
randomForest(formula = Species ~ ., data = .)
Type of random forest: classification
Number of trees: 500
No. of variables tried at each split: 2
OOB estimate of error rate: 0%
Confusion matrix:
setosa versicolor virginica class.error
setosa 12500 0 0 0
versicolor 0 12500 0 0
virginica 0 0 12500 0
[[2]]
Call:
randomForest(formula = Species ~ ., data = .)
Type of random forest: classification
Number of trees: 500
No. of variables tried at each split: 2
OOB estimate of error rate: 0%
Confusion matrix:
setosa versicolor virginica class.error
setosa 12500 0 0 0
versicolor 0 12500 0 0
virginica 0 0 12500 0
[[3]]
Call:
randomForest(formula = Species ~ ., data = .)
Type of random forest: classification
Number of trees: 500
No. of variables tried at each split: 2
OOB estimate of error rate: 0%
Confusion matrix:
setosa versicolor virginica class.error
setosa 12500 0 0 0
versicolor 0 12500 0 0
virginica 0 0 12500 0
[[4]]
Call:
randomForest(formula = Species ~ ., data = .)
Type of random forest: classification
Number of trees: 500
No. of variables tried at each split: 2
OOB estimate of error rate: 0%
Confusion matrix:
setosa versicolor virginica class.error
setosa 12500 0 0 0
versicolor 0 12500 0 0
virginica 0 0 12500 0
If you don’t need to explore the intermediate objects, you can combine the commands as:
<- split_iris %>%
rfs do(rf = randomForest(Species ~ ., data = .)) %>%
collect()
Finally, we need to combine the 4 randomForest models into a single one. This can be done with the combine()
function from the randomForest
package (the same function we already used in our foreach expressions):
<- do.call(combine, rfs$rf) rf_all
Be careful that randomForest
and dplyr
both have a combine()
function. The one we want here is the one from the randomForest
package. To avoid all conflict and confusion, you can use randomForest::combine()
. combine()
is ok if you make sure to load dplyr
before randomForest
since latest loaded functions overwrite earlier loaded ones.
Why are we using do.call()
? If we use:
combine(rfs$rf)
We get the silly message:
Error in combine(rfs$rf) :
Argument must be a list of randomForest objects
That is because randomForest::combine()
expects a list of randomForest objects, but cannot accept an object of type list.
Here is our final randomForest model:
rf_all
Call:
randomForest(formula = Species ~ ., data = .)
Type of random forest: classification
Number of trees: 2000
No. of variables tried at each split: 2
This is it: by splitting our data frame on 4 cores, we could run the code and create a randomForest model using whole of the data.
We can test our model:
<- data.frame(
new_data Sepal.Length = c(5.3, 4.6, 6.5),
Sepal.Width = c(3.1, 3.9, 2.5),
Petal.Length = c(1.5, 1.5, 5.0),
Petal.Width = c(0.2, 0.1, 2.1)
)
predict(rf_all, new_data)
1 2 3
setosa setosa virginica
Levels: setosa versicolor virginica
Running this in an interactive session was useful to see what happens, but the way you would actually do this is by writing a script (let’s call it partition.R
):
partition.R
library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
# Create cluster of workers
<- new_cluster(4)
cl
# Load randomForest on each worker
cluster_library(cl, "randomForest")
# Create our big data frame
<- iris[rep(seq_len(nrow(iris)), each = 1e3), ]
bigger_iris rownames(bigger_iris) <- NULL
# Create a partitioned data frame on the workers
<- partition(bigger_iris, cl)
split_iris
# Set the seed on each worker
cluster_send(cl, set.seed(123))
# Run the randomForest() function on each worker
<- split_iris %>%
rfs do(rf = randomForest(Species ~ ., data = .)) %>%
collect()
# Combine the randomForest models into one
<- do.call(combine, rfs$rf) rf_all
And run it with a Bash partition.sh
script:
partition.sh
#!/bin/bash
#SBATCH --time=10
#SBATCH --mem-per-cpu=7500M
#SBATCH --cpus-per-task=4
Rscript partition.R
Conclusion
multidplyr
allowed us to split our data frame across multiple workers on one node and this solved the memory issue we had with our large dataset.
Data partitioning for speed
Beside the memory advantage, are we getting any speedup from data parallelization? i.e. how does this code compare with the parallelization we did as regard the number of trees with foreach
and doFuture
?
We want to make sure to compare the same things. So we go back to our smaller big_iris
and we up the number of trees back to 2000.
We will compare it with the plans multisession
and multicore
that we performed earlier. The minimum and median times for these two options for shared memory parallelism were of 2.72s and 3.15s respectively.
partition_bench.R
library(multidplyr)
library(dplyr, warn.conflicts = FALSE)
library(bench)
<- new_cluster(4)
cl cluster_library(cl, "randomForest")
<- iris[rep(seq_len(nrow(iris)), each = 1e2), ]
big_iris rownames(big_iris) <- NULL
cluster_send(cl, set.seed(123))
<- function(data, cluster) {
part_rf <- partition(data, cluster)
split_data <- split_data %>%
rfs do(rf = randomForest(Species ~ ., data = ., ntree = 2000)) %>%
collect()
do.call(combine, rfs$rf)
}
mark(rf_all <- part_rf(big_iris, cl))
partition_bench.sh
#!/bin/bash
#SBATCH --time=10
#SBATCH --mem-per-cpu=7500M
#SBATCH --cpus-per-task=4
Rscript partition_bench.R
sbatch partition_bench.sh
randomForest 4.7-1.1
Type rfNews() to see new features/changes/bug fixes.
Attaching package: ‘randomForest’
The following object is masked from ‘package:dplyr’:
combine
# A tibble: 1 × 13
expression min median `itr/sec` mem_alloc `gc/sec` n_itr n_gc total_time
<bch:expr> <bch> <bch:> <dbl> <bch:byt> <dbl> <int> <dbl> <bch:tm>
1 rf_all <- pa… 2.48s 2.48s 0.403 NA 2.02 1 5 2.48s
# ℹ 4 more variables: result <list>, memory <list>, time <list>, gc <list>
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.
What about distributed memory?
Can multidplyr
run in distributed memory? There is nothing on this in the documentation, so I tried it.
I upped the number of workers to 8 and ran the code on 2 nodes with 4 cores per node and got no speedup. I also created a dataset 10 times bigger (with each = 1e4
), which creates an OOM on 4 cores one a single node and tried it on 11 nodes with 4 cores (10 to match the 10 times size increase, plus one to play safe). This didn’t solve the OOM issue. I tried various other tests, all with no success.
In conclusion, it seems that multidply
’s way of creating a cluster of workers doesn’t have a mechanism to spread them across nodes and that the package thus does not allow to split data across nodes.
In cases where your data is so big that it doesn’t fit in the memory of a single node, it doesn’t seem that any R package currently allow to split the data automatically for you.
Conclusion
As we could see, we got similar results: in this case, it is the same to spread the number of trees running on the full data on 4 cores (as we did with foreach
and doFuture
or to run all the trees on the data spread on 4 cores.
The difference being that foreach
and doFuture
allowed us to spread the trees across nodes while multidplyr
does not allow this for the data.
Direct data loading
The method we used is very convenient, but it involves copying the data to the workers. If you want to save some memory, you can load the split data directly to the workers.
For this, first, split your data into several files and have all those files (and only those files) in a directory.
Then, you can run:
library(multidplyr)
library(dplyr)
library(vroom)
# Create the cluster of workers
<- new_cluster(4)
cl
# Create a character vector with the list of data files
<- dir("/path/to/data/directory", full.names = TRUE)
files
# Split up the vector amongst the workers
cluster_assign_partition(cl, files = files)
# Create a data frame called split_iris on each worker
cluster_send(cl, split_iris <- vroom(files))
# Create the partitioned data frame from the workers' data frames
<- party_df(cl, "split_iris") split_iris
From here on, you can work as we did earlier.