<- function(x) {
slow_log Sys.sleep(1)
log(x)
}
Futureverse follow-up to the tutorial
Error handling
All time benchmarks are done using the {tictoc} package and on a MacBook Pro 2021 with an Apple M1 Pro chip including 10 cores and 32 GB of RAM under Sonoma 14.5 macOS.
Problem setup
Let us create a slow log function:
We can apply this function to the integers from 1 to 10 for example using purrr::map_dbl()
:
<- 1:10 x1
tic()
::map_dbl(x1, slow_log)
purrrtoc()
[1] 0.0000000 0.6931472 1.0986123 1.3862944 1.6094379 1.7917595 1.9459101
[8] 2.0794415 2.1972246 2.3025851
10.064 sec elapsed
As expected, it takes 10 seconds to run.
We can parallelize this computation using furrr::future_map_dbl()
:
plan(multisession, workers = 2)
tic()
::future_map_dbl(x1, slow_log)
furrrtoc()
[1] 0.0000000 0.6931472 1.0986123 1.3862944 1.6094379 1.7917595 1.9459101
[8] 2.0794415 2.1972246 2.3025851
5.256 sec elapsed
As expected, it takes 5 seconds to run.
Now, assume that the last input value has wrongly be stored as character. Then the previous run will fail with an error. Let us see this in action:
<- c(as.list(1:9), "10")
x2 tic()
::future_map_dbl(x2, slow_log)
furrrtoc()
Error:
ℹ In index: 5.
Caused by error in `log()`:
! non-numeric argument to mathematical function
5.231 sec elapsed
We typically parallelize long time-consuming computations. In the context of map-reduce operations, we can have a situation where one of the tasks fails possibly after a large number of computations that succeeded. This can happen for example if the input data is not as expected. In this case, the error is caught and relayed to the user but the computation stops and elements on which computation succeeded are not returned. This is not ideal as we would like (i) to continue the computation on the other tasks and (ii) to have a way to retrieve the results of the tasks that succeeded and even of the computations that succeeded on the failed task.
Futureverse vision
The goal of the future framework in the context of map-reduce operations, is to help with paralellizing the application of a long-running function to a large number of inputs.
If the long-running function fails on some inputs, the future framework chooses by design to behave consistently with the function that is applied. This means that if the function fails on some inputs, the future framework will stop the computation and return an error.
The reason for this design choice is that the future framework is responsible for the parallelization of the function application, but not for the function itself. The function is responsible for its own error handling. This design choice is consistent with the principle of separation of concerns.
Indeed, the problem is already here with purrr::map_dbl()
:
tic()
::map_dbl(x2, slow_log)
purrrtoc()
Error in `purrr::map_dbl()`:
ℹ In index: 10.
Caused by error in `log()`:
! non-numeric argument to mathematical function
10.107 sec elapsed
The error is caught, the computation stops and no result is returned. One may think that this is due to the purrr::map_dbl()
function and might try to use lapply()
instead:
tic()
lapply(x2, slow_log)
toc()
Error in log(x): non-numeric argument to mathematical function
10.048 sec elapsed
But the result is the same. The error is caught, the computation stops and no result is returned.
The underlying problem is that, in the context of applying the long-running slow_log()
function repeatedly, we are not happy with the default behavior of stopping the computation when an error occurs in the base::log()
function.
Solution(s)
The encouraged solution is therefore to handle the error in the function itself and to return a sentinel value when an error occurs. This way, the computation can continue until the end and we can retrieve the results of the tasks that succeeded.
We can for example implement a new version of the slow_log()
function that returns NA
when an error occurs and issues a warning instead of an error:
<- function(x) {
slow_log2 tryCatch({
slow_log(x)
error = function(e) {
}, warning(conditionMessage(e))
NA_real_
}) }
We can then parallelize this computation using furrr::future_map_dbl()
:
plan(multisession, workers = 2)
tic()
::future_map_dbl(x2, slow_log2)
furrrtoc()
Warning in value[[3L]](cond): non-numeric argument to mathematical function
[1] 0.0000000 0.6931472 1.0986123 1.3862944 1.6094379 1.7917595 1.9459101
[8] 2.0794415 2.1972246 NA
5.279 sec elapsed
As expected, it takes 5 seconds to run. Futhermore, it does not stop when an error occurs and returns NA
for the failed computation which allows for the computation to continue until the end and retrieve the results of the tasks that succeeded.
However, one could argue that the warning message is not informative enough because it does not give an indication of which input caused the error. Ideally one would like to know the index of the input that caused the error in the original input list of values. This requires to handle the error as a warning at the level of the future_map_dbl()
function and therefore one can define a future_map_log()
function which may look like this:
<- function(.x, ...,
future_map_log .options = furrr::furrr_options(),
.env_globals = parent.frame(),
.progress = FALSE) {
::future_imap_dbl(.x, \(x, y) {
furrrtryCatch({
slow_log(x)
error = function(e) {
}, ::cli_alert_warning("Non-numeric input at index {y}")
cliNA_real_
}).options = .options, .env_globals = .env_globals, .progress = .progress)
}, }
This leads to:
plan(multisession, workers = 2)
tic()
future_map_log(x2)
toc()
! Non-numeric input at index 10
[1] 0.0000000 0.6931472 1.0986123 1.3862944 1.6094379 1.7917595 1.9459101
[8] 2.0794415 2.1972246 NA
5.301 sec elapsed
You can find more details in Henrik Bengsston’s vignette dedicated to common issues with futures.
Wrong index information upon error in {furrr}
When using the {purrr} package to manipulate lists instead of base R *apply()
family, the user gets to know the index of the element that caused an error:
::map_dbl(list(1, 2, 3, "4"), log) purrr
Error in `purrr::map_dbl()`:
ℹ In index: 4.
Caused by error:
! non-numeric argument to mathematical function
Parallel versions of the purrr::*map*()
functions are provided by the {furrr} package. We would expect the same behavior when an error occurs in a future:
plan(multisession, workers = 2)
::future_map_dbl(list(1, 2, 3, "4"), log) furrr
Error:
ℹ In index: 2.
Caused by error:
! non-numeric argument to mathematical function
This is not the case. The error message does not provide the correct index of the element that caused the error. What it currently does is to provide the index in the subset of the data that was sent to the worker that failed. This is not very helpful as the user is interested in the index in the original data. This is a known issue and is discussed in Issue #250 on the GitHub repository of the package.
In the meantime, a workaround is to use the idea behind the purrr::imap_*
functions which keeps track of the index of the element in the original list:
<- function(.x, .f, ...,
my_future_map_dbl .options = furrr::furrr_options(),
.env_globals = parent.frame(),
.progress = FALSE) {
::future_imap_dbl(.x, \(x, y) {
furrrtryCatch({
.f(x)
error = function(e) {
}, ::cli_alert_danger("Non-numeric input at index {y} in the original input list.", wrap = TRUE)
cli::cli_alert_warning("The index mentioned in the message below refers to the subset of the data that was sent to the worker that failed.", wrap = TRUE)
cli
}).options = .options, .env_globals = .env_globals, .progress = .progress)
}, }
This leads to:
plan(multisession, workers = 2)
my_future_map_dbl(list(1, 2, 3, "4"), log)
✖ Non-numeric input at index 4 in the original input list.
! The index mentioned in the message below refers to the subset of the data
that was sent to the worker that failed.
Error:
ℹ In index: 2.
Caused by error:
! Can't coerce from a string to a double.
Futureverse + optimised BLAS (MKL, OpenBLAS, vecLib)
All time benchmarks are done using the {tictoc} package and on a Dell computer with Intel(R) Xeon(R) W-10885M CPU @ 2.40GHz processor including 8 cores and 64 GB of RAM under Windows 11.
Context
It is possible to boost matrix calculations in R by replacing the default BLAS library with an optimized one. The most popular optimized BLAS libraries are Intel’s Math Kernel Library (MKL), OpenBLAS and vecLib (on macOS). See these articles for how to setup and/or some benchmarking:
- https://thomasmcrow.com/blog/2021-08-optimized-blas-in-r/
- https://mpopov.com/blog/2019/06/04/faster-matrix-math-in-r-on-macos/
- https://csantill.github.io/RPerformanceWBLAS/
- https://stateofther.github.io/finistR2023/Intel_MKL.html
These optimized BLAS libraries are typically faster than the default BLAS library that comes with R by exploiting vectorization and parallelization.
Problem setup
The problem is that combining the optimized BLAS libraries with the future framework seems to deteriorate the performance of the optimized BLAS and leads to increased computation time.
Use of parallelization in optimized BLAS libraries
By default, it uses all available cores. However, the optimization is mostly due to vectorization and rather than parallelization. See this webpage for a better understanding of the gain one can expect from MKL. Importantly, it is mentioned that:
Most of the benefit of the Intel MKL is from vectorised math, not multi-threading. A big performance boost when using the MKL with just one thread. A marginal increase when using 4 threads, most notable in matrix multiply, and no benefit for singular value decomposition.
Another resource on HCP here.
Example 1: matrix inversion
## Have a look at the nb of cores / threads
## Nb cores
::get_num_cores()
RhpcBLASctl## Nb threads
::get_num_procs()
RhpcBLASctl
## Generate a list of matrices to inverse
<- 16 ## Matrix number
N <- 2000 ## Matrix size
p <- lapply(1:N, \(i) matrix(rnorm(p * p), p, p) + diag(p))
x
## Make a function that controls the number of threads used by the math lib
## to compute the matrix inverse
<- function(x, nbthr = 1) {
compute_inv ::blas_set_num_threads(threads = nbthr)
RhpcBLASctlsolve(x)
}
## Default
tic()
<- lapply(x, solve)
res toc()
2.56 sec elapsed
## Then disable MKL parallel computation
tic()
<- lapply(x, compute_inv, nbthr = 1)
res toc()
2.71 sec elapsed
## Then use future but without parallelization
plan(sequential)
tic()
<- furrr::future_map(x, compute_inv, nbthr = 1)
res toc()
2.8 sec elapsed
Reducing the nb of threads does not affect the computational time! The computational cost reduction is due to vectorization, not multi-threading.
## Now use 2 workers
plan(multisession, workers = 2)
tic()
<- furrr::future_map(x, compute_inv, nbthr = 1)
res toc()
4.19 sec elapsed
## Now use 4 workers
plan(multisession, workers = 4)
tic()
<- furrr::future_map(x, compute_inv, nbthr = 1)
res toc()
5.03 sec elapsed
## Now use 8 workers
plan(multisession, workers = 8)
tic()
<- furrr::future_map(x, compute_inv, nbthr = 1)
res toc()
5.5 sec elapsed
## Now use 16 workers
plan(multisession, workers = 16)
tic()
<- furrr::future_map(x, compute_inv, nbthr = 1)
res toc()
7.68 sec elapsed
Systematic downgrading of the performance, the higher the nb of workers, the higher the computational time.
Example 2: matrix product
## Generate a list of pairs of matrices to multiply with a ref matrix
<- 16 ## Matrix number
N <- 5000 ## Matrix nb row
NbRow <- 1000 ## Matrix nb row
NbCol
<- lapply(1:N, \(i) matrix(rnorm(NbRow * NbCol), NbRow, NbCol))
x <- matrix(rnorm(NbRow * NbCol), NbRow, NbCol)
RefMat
## Make a function that controls the number of threads used by the math lib
## to compute the matrix crossprod
<- function(x,nbthr=1){
ComputeProd blas_set_num_threads(threads = nbthr)
crossprod(x,RefMat)
}
Using the same setup as for Example 1, we get the following computational times:
- Default (MKL parallel computation): 0.67 sec;
- Disable MKL parallel computation: 2.74 sec;
- Future without parallelization: 2.95 sec;
- Future with 2 workers: 2.95 sec;
- Future with 4 workers: 2.89 sec;
- Future with 8 workers: 4 sec;
- Future with 16 workers: 6.71 sec.
Current understanding
For now the results are not intuitive at all:
- solve mostly makes use of vectorization from MKL, as illustrated by the fact that changing the number of threads does not change the performance;
- in contrast, crossprod benefits from multi-threading: reducing the number of threads downgrades the computational speed.
So we expect that reducing the number of threads to \(1\) will not hamper solve()
, allowing the use of futures for parallelization over matrices. We should observe some (significant ?) gain. However, as soon as we increase the number of workers, we downgrade the performance. Differently, in the matrix product case, if we reduce the number of threads used by MKL, we allow for parallelization over matrices via futures but at the cost of some increase in computational time for each cross product. So there seems to be a trade-off, possibly hard to deal with, between the implicit multi-threading of MKL and the explicit one of future. However we basically observe no impact when the number of workers is low, and a downgrade when it is increased…
Using source()
in a future
According to Henrik Bengsston’s vignette dedicated to common issues with futures:
Avoid using
source()
inside futures. It is always better to source external R scripts at the top of your main R script, e.g.
library(future)
source("./my-script.R")
<- future({
f
... })
However, if you find yourself having to source a script inside a future, or inside a function, make sure to specify argument
local = TRUE
, e.g.
<- future({
f source("./my-script.R", local = TRUE)
... })
This is because
source()
defaults tolocal = FALSE
, which has side effects. When usinglocal = FALSE
, any functions or variables defined by the R script are assigned to the global environment - not the calling environment as we might expect. This may make little different when callingsource()
from the R prompt, or from another script. However, when called from inside a function, insidelocal()
, or inside a future, it might result in unexpected behavior. It is similar to usingassign("a", 42, envir = globalenv())
, which is known be a bad practice. To be on the safe side, it is almost always better callsource()
withlocal = TRUE
.
Nested parallelization with {future}
All time benchmarks are done using the {tictoc} package and on a Lenovo LOQ 15IRH8 with an Intel i5-12450H including 12 cores and 16 GB of RAM under Ubuntu 24.04.1 LTS.
library(future)
library(future.apply)
library(future.callr)
library(progressr)
Nested task definition
We define the following task where we split a sum in sub-sums and perform this multiple times. To slow down computations, we use progressr::slow_sum
.
<- 0.5
delay
<- list(1:10, 11:20)
list_sums <- rep(list(list_sums), 2L)
list_list_sums <- function() {
future_nested_loop unlist(future.apply::future_lapply(list_list_sums, function(l_sums) {
<- unlist(
v_sums ::future_lapply(l_sums, slow_sum,
future.applydelay = delay,
message = FALSE
)
)sum(v_sums)
})) }
Below we implement a checking function to ensure everything went fine and the returned result was correct.
<- rep(sum(unlist(list_sums)), length(list_list_sums))
expected_result
<- function(task_time, task_result) {
check_result if (!all(task_result == expected_result)) {
::cli_abort(message = c("{task_time$msg} result do not match expected result",
cli"x" = "Expected : {expected_result} got {task_result}"
))else {
} ::cli_alert_success("Task returned expected result")
cli
} }
R
and base plan
with {future}
Base R
and plan(sequential)
With R
base functions :
tic("Base R")
<- unlist(lapply(list_list_sums, function(l_sums) {
base_r_result <- unlist(
v_sums lapply(l_sums, slow_sum, delay = delay, message = FALSE)
)sum(v_sums)
}))<- toc() base_r_time
Base R: 20.076 sec elapsed
check_result(task_time = base_r_time, base_r_result)
✔ Task returned expected result
With our delay values we have the following :
- \(10 \times 0.5s\) for each sub-sum.
- \(2 \times 10 \times 0.5s\) for each sub-list of sub-sum
So \(2 \times 2 \times 10 \times 0.5 s\) for the list of sub-lists of sub-sums, so \(20 s\). Which is what we approximately observe with {tictoc}
:
\(20.076 s\).
Avec le plan sequential
plan(sequential)
tic("Future plan(sequential)")
<- future_nested_loop()
sequential_result <- toc() sequential_time
Future plan(sequential): 20.145 sec elapsed
check_result(
task_time = sequential_time,
task_result = sequential_result
)
With sequential
plan it takes \(20.145\) s, which is comparable withR
base functions.
plan(multisession)
plan(tweak("multisession", workers = 2L))
tic("Future plan(tweak('multisession', workers = 2L))")
<- future_nested_loop()
multisession2_result <- toc() multisession2_time
Future plan(tweak('multisession', workers = 2L)): 10.365 sec elapsed
check_result(
task_time = multisession2_time,
task_result = multisession2_result
)
✔ Task returned expected result
By allocating 2 workers with multisession
we manage to obtain a duration below sequential
: \(10.365\) s.
plan(multisession(workers = 4L))
tic("Future plan(multisession(workers = 4L))")
<- future_nested_loop()
multisession4_result <- toc() multisession4_time
Future plan(multisession(workers = 4L)): 10.404 sec elapsed
check_result(
task_time = multisession4_time,
task_result = multisession4_result
)
✔ Task returned expected result
By allocating 4 workers, we observe no improvement, it takes: \(10.404\) s.
This is because {future}
doesn’t leverage nested parallelization by default, the first future_lapply
being on a two elements list, the maximal reduction time is obtained with two workers. The other two allocated in this example are not used.
Nested parallelization
According to the documentation we can specify not only a single plan
but a list of plans, that can apply to nested futures.
plan(list(multisession, multisession))
tic("Future plan(list(multisession, multisession))")
<- future_nested_loop()
list_multisession_result <- toc() list_multisession_time
Future plan(list(multisession, multisession)): 10.651 sec elapsed
check_result(
task_time = list_multisession_time,
task_result = list_multisession_result
)
✔ Task returned expected result
But at the opposite of what we expected, we do not lower the time taken but it increases ! The task takes: \(10.651\) s.
As Henrik Bengtsson explains in the vignette Future Topologies, {future}
has a protection against recursive parallelism. And thus the second loop is not parallelized but forced to be sequential.
The plan must thus be specified in a specific manner to work as intended.
Plan lists with explicit parameters
Given the structure of our task we want the first loop to run on the 2 elements of the list in parallel, thus we need 2 workers for the first future_lapply
.
Each sub-list containing 2 elements, we want to allocate each 2 workers.
As we want to parallelize the whole task we need: \(2 \times 2 = 4\) workers to allocate.
plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = 2L)))
tic("Future plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = 2L)))")
<- rlang::try_fetch(
list_multisession2_2_result future_nested_loop(),
error = function(cnd) inform("Task failed.", parent = cnd)
)
Warning in checkNumberOfLocalWorkers(workers): Careful, you are setting up 2
localhost parallel workers with only 1 CPU cores available for this R process
(per ‘mc.cores’), which could result in a 200% load. The soft limit is set to
100%. Overusing the CPUs has negative impact on the current R process, but also
on all other processes of yours and others running on the same machine. See
help("parallelly.options", package = "parallelly") for how to override the soft
and hard limits
Warning in checkNumberOfLocalWorkers(workers): Careful, you are setting up 2
localhost parallel workers with only 1 CPU cores available for this R process
(per ‘mc.cores’), which could result in a 200% load. The soft limit is set to
100%. Overusing the CPUs has negative impact on the current R process, but also
on all other processes of yours and others running on the same machine. See
help("parallelly.options", package = "parallelly") for how to override the soft
and hard limits
<- toc() list_multisession2_2_time
Future plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = 2L))): 6.349 sec elapsed
check_result(
task_time = list_multisession2_2_time,
task_result = list_multisession2_2_result
)
✔ Task returned expected result
With this plan we get some warnings, explaining the we are over-parallelizing. We get this message even if we have the correct number of cores (or workers) to properly run this task.
This is {future}
defense mecanism against recursive parallelism.
{future}
has a soft limit (that gives warnings) and a hard limit, when reaching the hard limit (of 300% load per core) the future won’t run, even if there is enough available cores.
The next callout block and next code block explains how to tell {future}
that this plan is ok.
{future}
know that we want recursive parallelism
To tell the package that we know what we are doing, we must use I()
which gives AsIs
class to the object it wraps.
plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = I(2L))))
tic("Future plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = I(2L))))")
<- rlang::try_fetch(
list_multisession2_I2_result future_nested_loop(),
error = function(cnd) inform("Task failed.", parent = cnd)
)<- toc() list_multisession2_I2_time
Future plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = I(2L)))): 6.396 sec elapsed
check_result(
task_time = list_multisession2_I2_time,
task_result = list_multisession2_I2_result
)
✔ Task returned expected result
And this way we can specify a working nested parallelization plan.
With {future.callr}
The {future.callr}
package solves some limitations of future::multisession()
and do not need the use of I()
as shown in the below example:
plan(list(tweak(callr, workers = 2L), tweak(callr, workers = 2L)))
tic("Future `plan(list(tweak(callr, workers = 2L), tweak(callr, workers = 2L)))`")
<- rlang::try_fetch(
list_callr2_2_result future_nested_loop(),
error = function(cnd) inform("Task failed.", parent = cnd)
)<- toc() list_callr2_2_time
Future `plan(list(tweak(callr, workers = 2L), tweak(callr, workers = 2L)))`: 6.299 sec elapsed
check_result(
task_time = list_callr2_2_time,
task_result = list_callr2_2_result
)
✔ Task returned expected result
Execution time summary table
future::plan(.) |
Elapsed time |
---|---|
Base R | 20.076 |
sequential |
20.145 |
multisession(workers = 2L) |
10.365 |
multisession(workers = 4L) |
10.404 |
list(multisession, multisession) |
10.651 |
list(tweak(multisession, workers = 2L), tweak(multisession, workers = 2L)) |
6.349 |
list(tweak(multisession, workers = 2L), tweak(multisession, workers = I(2L))) |
6.396 |
list(tweak(callr, workers = 2L), tweak(callr, workers = 2L)) |
6.299 |