Futureverse follow-up to the tutorial

Authors

Tristan Mary-Huard

Aymeric Stamm

Louis Lacoste

Error handling

Benchmarking

All time benchmarks are done using the {tictoc} package and on a MacBook Pro 2021 with an Apple M1 Pro chip including 10 cores and 32 GB of RAM under Sonoma 14.5 macOS.

Problem setup

Let us create a slow log function:

slow_log <- function(x) {
  Sys.sleep(1)
  log(x)
}

We can apply this function to the integers from 1 to 10 for example using purrr::map_dbl():

x1 <- 1:10
tic()
purrr::map_dbl(x1, slow_log)
toc()
  [1] 0.0000000 0.6931472 1.0986123 1.3862944 1.6094379 1.7917595 1.9459101
  [8] 2.0794415 2.1972246 2.3025851

10.064 sec elapsed

As expected, it takes 10 seconds to run.

We can parallelize this computation using furrr::future_map_dbl():

plan(multisession, workers = 2)
tic()
furrr::future_map_dbl(x1, slow_log)
toc()
  [1] 0.0000000 0.6931472 1.0986123 1.3862944 1.6094379 1.7917595 1.9459101
  [8] 2.0794415 2.1972246 2.3025851

5.256 sec elapsed

As expected, it takes 5 seconds to run.

Now, assume that the last input value has wrongly be stored as character. Then the previous run will fail with an error. Let us see this in action:

x2 <- c(as.list(1:9), "10")
tic()
furrr::future_map_dbl(x2, slow_log)
toc()
Error:
ℹ In index: 5.
Caused by error in `log()`:
! non-numeric argument to mathematical function

5.231 sec elapsed
Apparent problem

We typically parallelize long time-consuming computations. In the context of map-reduce operations, we can have a situation where one of the tasks fails possibly after a large number of computations that succeeded. This can happen for example if the input data is not as expected. In this case, the error is caught and relayed to the user but the computation stops and elements on which computation succeeded are not returned. This is not ideal as we would like (i) to continue the computation on the other tasks and (ii) to have a way to retrieve the results of the tasks that succeeded and even of the computations that succeeded on the failed task.

Futureverse vision

Design strategy

The goal of the future framework in the context of map-reduce operations, is to help with paralellizing the application of a long-running function to a large number of inputs.

If the long-running function fails on some inputs, the future framework chooses by design to behave consistently with the function that is applied. This means that if the function fails on some inputs, the future framework will stop the computation and return an error.

The reason for this design choice is that the future framework is responsible for the parallelization of the function application, but not for the function itself. The function is responsible for its own error handling. This design choice is consistent with the principle of separation of concerns.

Indeed, the problem is already here with purrr::map_dbl():

tic()
purrr::map_dbl(x2, slow_log)
toc()
Error in `purrr::map_dbl()`:
ℹ In index: 10.
Caused by error in `log()`:
! non-numeric argument to mathematical function

10.107 sec elapsed

The error is caught, the computation stops and no result is returned. One may think that this is due to the purrr::map_dbl() function and might try to use lapply() instead:

tic()
lapply(x2, slow_log)
toc()
Error in log(x): non-numeric argument to mathematical function

10.048 sec elapsed

But the result is the same. The error is caught, the computation stops and no result is returned.

True problem

The underlying problem is that, in the context of applying the long-running slow_log() function repeatedly, we are not happy with the default behavior of stopping the computation when an error occurs in the base::log() function.

Solution(s)

The encouraged solution is therefore to handle the error in the function itself and to return a sentinel value when an error occurs. This way, the computation can continue until the end and we can retrieve the results of the tasks that succeeded.

We can for example implement a new version of the slow_log() function that returns NA when an error occurs and issues a warning instead of an error:

slow_log2 <- function(x) {
  tryCatch({
    slow_log(x)
  }, error = function(e) {
    warning(conditionMessage(e))
    NA_real_
  })
}

We can then parallelize this computation using furrr::future_map_dbl():

plan(multisession, workers = 2)
tic()
furrr::future_map_dbl(x2, slow_log2)
toc()
Warning in value[[3L]](cond): non-numeric argument to mathematical function

 [1] 0.0000000 0.6931472 1.0986123 1.3862944 1.6094379 1.7917595 1.9459101
 [8] 2.0794415 2.1972246        NA

5.279 sec elapsed

As expected, it takes 5 seconds to run. Futhermore, it does not stop when an error occurs and returns NA for the failed computation which allows for the computation to continue until the end and retrieve the results of the tasks that succeeded.

However, one could argue that the warning message is not informative enough because it does not give an indication of which input caused the error. Ideally one would like to know the index of the input that caused the error in the original input list of values. This requires to handle the error as a warning at the level of the future_map_dbl() function and therefore one can define a future_map_log() function which may look like this:

future_map_log <- function(.x, ..., 
                           .options = furrr::furrr_options(), 
                           .env_globals = parent.frame(), 
                           .progress = FALSE) {
  furrr::future_imap_dbl(.x, \(x, y) {
    tryCatch({
      slow_log(x)
    }, error = function(e) {
      cli::cli_alert_warning("Non-numeric input at index {y}")
      NA_real_
    })
  }, .options = .options, .env_globals = .env_globals, .progress = .progress)
}

This leads to:

plan(multisession, workers = 2)
tic()
future_map_log(x2)
toc()
! Non-numeric input at index 10

 [1] 0.0000000 0.6931472 1.0986123 1.3862944 1.6094379 1.7917595 1.9459101
 [8] 2.0794415 2.1972246        NA

5.301 sec elapsed

You can find more details in Henrik Bengsston’s vignette dedicated to common issues with futures.

Wrong index information upon error in {furrr}

When using the {purrr} package to manipulate lists instead of base R *apply() family, the user gets to know the index of the element that caused an error:

purrr::map_dbl(list(1, 2, 3, "4"), log)
Error in `purrr::map_dbl()`:
ℹ In index: 4.
Caused by error:
! non-numeric argument to mathematical function

Parallel versions of the purrr::*map*() functions are provided by the {furrr} package. We would expect the same behavior when an error occurs in a future:

plan(multisession, workers = 2)
furrr::future_map_dbl(list(1, 2, 3, "4"), log)
Error:
ℹ In index: 2.
Caused by error:
! non-numeric argument to mathematical function

This is not the case. The error message does not provide the correct index of the element that caused the error. What it currently does is to provide the index in the subset of the data that was sent to the worker that failed. This is not very helpful as the user is interested in the index in the original data. This is a known issue and is discussed in Issue #250 on the GitHub repository of the package.

In the meantime, a workaround is to use the idea behind the purrr::imap_* functions which keeps track of the index of the element in the original list:

my_future_map_dbl <- function(.x, .f, ..., 
                              .options = furrr::furrr_options(), 
                              .env_globals = parent.frame(), 
                              .progress = FALSE) {
  furrr::future_imap_dbl(.x, \(x, y) {
    tryCatch({
      .f(x)
    }, error = function(e) {
      cli::cli_alert_danger("Non-numeric input at index {y} in the original input list.", wrap = TRUE)
      cli::cli_alert_warning("The index mentioned in the message below refers to the subset of the data that was sent to the worker that failed.", wrap = TRUE)
    })
  }, .options = .options, .env_globals = .env_globals, .progress = .progress)
}

This leads to:

plan(multisession, workers = 2)
my_future_map_dbl(list(1, 2, 3, "4"), log)
✖ Non-numeric input at index 4 in the original input list.
! The index mentioned in the message below refers to the subset of the data
that was sent to the worker that failed.
Error:
ℹ In index: 2.
Caused by error:
! Can't coerce from a string to a double.

Futureverse + optimised BLAS (MKL, OpenBLAS, vecLib)

Benchmarking

All time benchmarks are done using the {tictoc} package and on a Dell computer with Intel(R) Xeon(R) W-10885M CPU @ 2.40GHz processor including 8 cores and 64 GB of RAM under Windows 11.

Context

It is possible to boost matrix calculations in R by replacing the default BLAS library with an optimized one. The most popular optimized BLAS libraries are Intel’s Math Kernel Library (MKL), OpenBLAS and vecLib (on macOS). See these articles for how to setup and/or some benchmarking:

These optimized BLAS libraries are typically faster than the default BLAS library that comes with R by exploiting vectorization and parallelization.

Problem setup

Problem

The problem is that combining the optimized BLAS libraries with the future framework seems to deteriorate the performance of the optimized BLAS and leads to increased computation time.

Use of parallelization in optimized BLAS libraries

By default, it uses all available cores. However, the optimization is mostly due to vectorization and rather than parallelization. See this webpage for a better understanding of the gain one can expect from MKL. Importantly, it is mentioned that:

Most of the benefit of the Intel MKL is from vectorised math, not multi-threading. A big performance boost when using the MKL with just one thread. A marginal increase when using 4 threads, most notable in matrix multiply, and no benefit for singular value decomposition.

Another resource on HCP here.

Example 1: matrix inversion

## Have a look at the nb of cores / threads
## Nb cores
RhpcBLASctl::get_num_cores()
## Nb threads 
RhpcBLASctl::get_num_procs()

## Generate a list of matrices to inverse
N <- 16    ## Matrix number
p <- 2000   ## Matrix size
x <- lapply(1:N, \(i) matrix(rnorm(p * p), p, p) + diag(p))

## Make a function that controls the number of threads used by the math lib 
## to compute the matrix inverse
compute_inv <- function(x, nbthr = 1) {
  RhpcBLASctl::blas_set_num_threads(threads = nbthr)
  solve(x)
}
## Default 
tic()
res <- lapply(x, solve)
toc()
2.56 sec elapsed
## Then disable MKL parallel computation
tic()
res <- lapply(x, compute_inv, nbthr = 1)
toc()
2.71 sec elapsed
## Then use future but without parallelization
plan(sequential)
tic()
res <- furrr::future_map(x, compute_inv, nbthr = 1)
toc()
2.8 sec elapsed

Reducing the nb of threads does not affect the computational time! The computational cost reduction is due to vectorization, not multi-threading.

## Now use 2 workers
plan(multisession, workers = 2)
tic()
res <- furrr::future_map(x, compute_inv, nbthr = 1)
toc()
4.19 sec elapsed
## Now use 4 workers
plan(multisession, workers = 4)
tic()
res <- furrr::future_map(x, compute_inv, nbthr = 1)
toc()
5.03 sec elapsed
## Now use 8 workers
plan(multisession, workers = 8)
tic()
res <- furrr::future_map(x, compute_inv, nbthr = 1)
toc()
5.5 sec elapsed
## Now use 16 workers
plan(multisession, workers = 16)
tic()
res <- furrr::future_map(x, compute_inv, nbthr = 1)
toc()
7.68 sec elapsed

Systematic downgrading of the performance, the higher the nb of workers, the higher the computational time.

Example 2: matrix product

## Generate a list of pairs of matrices to multiply with a ref matrix
N <- 16    ## Matrix number
NbRow <- 5000   ## Matrix nb row
NbCol <- 1000   ## Matrix nb row

x <- lapply(1:N, \(i) matrix(rnorm(NbRow * NbCol), NbRow, NbCol))
RefMat <- matrix(rnorm(NbRow * NbCol), NbRow, NbCol) 

## Make a function that controls the number of threads used by the math lib 
## to compute the matrix crossprod
ComputeProd <- function(x,nbthr=1){
  blas_set_num_threads(threads = nbthr)
  crossprod(x,RefMat)
}

Using the same setup as for Example 1, we get the following computational times:

  • Default (MKL parallel computation): 0.67 sec;
  • Disable MKL parallel computation: 2.74 sec;
  • Future without parallelization: 2.95 sec;
  • Future with 2 workers: 2.95 sec;
  • Future with 4 workers: 2.89 sec;
  • Future with 8 workers: 4 sec;
  • Future with 16 workers: 6.71 sec.

Current understanding

For now the results are not intuitive at all:

  • solve mostly makes use of vectorization from MKL, as illustrated by the fact that changing the number of threads does not change the performance;
  • in contrast, crossprod benefits from multi-threading: reducing the number of threads downgrades the computational speed.

So we expect that reducing the number of threads to \(1\) will not hamper solve(), allowing the use of futures for parallelization over matrices. We should observe some (significant ?) gain. However, as soon as we increase the number of workers, we downgrade the performance. Differently, in the matrix product case, if we reduce the number of threads used by MKL, we allow for parallelization over matrices via futures but at the cost of some increase in computational time for each cross product. So there seems to be a trade-off, possibly hard to deal with, between the implicit multi-threading of MKL and the explicit one of future. However we basically observe no impact when the number of workers is low, and a downgrade when it is increased…

Using source() in a future

According to Henrik Bengsston’s vignette dedicated to common issues with futures:

Avoid using source() inside futures. It is always better to source external R scripts at the top of your main R script, e.g.

library(future)
source("./my-script.R")

f <- future({
  ...
})

However, if you find yourself having to source a script inside a future, or inside a function, make sure to specify argument local = TRUE, e.g.

f <- future({
  source("./my-script.R", local = TRUE)
  ...
})

This is because source() defaults to local = FALSE, which has side effects. When using local = FALSE, any functions or variables defined by the R script are assigned to the global environment - not the calling environment as we might expect. This may make little different when calling source() from the R prompt, or from another script. However, when called from inside a function, inside local(), or inside a future, it might result in unexpected behavior. It is similar to using assign("a", 42, envir = globalenv()), which is known be a bad practice. To be on the safe side, it is almost always better call source() with local = TRUE.

Sharing big matrices between R processes

The {bigmemory} package provides mechanisms for working with very large matrices that can be updated in-place, which helps save memory.

Problem of non-exportable objects

Some types of R objects can be used only in the R session they were created. If used as-is in another R process, such objects often result in an immediate error or in obscure and hard-to-troubleshoot outcomes. Because of this, they cannot be saved to file and re-used at a later time. They may also not be exported to a parallel worker when doing parallel processing. These objects are sometimes referred to as non-exportable or non-serializable objects.

Objects of class big.matrix are non-exportable as-is. This means that they cannot be exported to a parallel worker when doing parallel processing. This is because the object is a reference to a memory-mapped file, and the worker does not have access to the memory-mapped file.

The marshalling solution

One solution to this problem is to use “marshalling” to encode the R object into an exportable representation that then can be used to re-create a copy of that object in another R process that imitates the original object.

The {marshal} package provides generic functions marshal() and unmarshal() for marshalling and unmarshalling R objects of certain class. This makes it possible to save otherwise non-exportable objects to file and then be used in a future R session, or to transfer them to another R process to be used there.

As part of the development of the marshal package, the author has listed a number of classes that are non-exportable and assessed whether they can be marshalled and whether they must be marshalled. The list is available in an article from the package website here. One can see that the class big.matrix is in the list of non-exportable classes and has been assessed as a class that can and must be marshalled. This means that the marshal package will soon be able to marshal and unmarshal objects of class big.matrix.

Active working group on marshalling and serialization

May 2024: The R Consortium ISC Working Group ‘Marshaling and Serialization in R’ has been launched to work on this problem. So we can expect some progress in the near future.

Nested parallelization with {future}

Benchmarking

All time benchmarks are done using the {tictoc} package and on a Lenovo LOQ 15IRH8 with an Intel i5-12450H including 12 cores and 16 GB of RAM under Ubuntu 24.04.1 LTS.

library(future)
library(future.apply)
library(future.callr)
library(progressr)

Nested task definition

We define the following task where we split a sum in sub-sums and perform this multiple times. To slow down computations, we use progressr::slow_sum.

delay <- 0.5

list_sums <- list(1:10, 11:20)
list_list_sums <- rep(list(list_sums), 2L)
future_nested_loop <- function() {
    unlist(future.apply::future_lapply(list_list_sums, function(l_sums) {
        v_sums <- unlist(
            future.apply::future_lapply(l_sums, slow_sum,
                delay = delay,
                message = FALSE
            )
        )
        sum(v_sums)
    }))
}

Below we implement a checking function to ensure everything went fine and the returned result was correct.

expected_result <- rep(sum(unlist(list_sums)), length(list_list_sums))

check_result <- function(task_time, task_result) {
    if (!all(task_result == expected_result)) {
        cli::cli_abort(message = c("{task_time$msg} result do not match expected result",
            "x" = "Expected : {expected_result} got {task_result}"
        ))
    } else {
        cli::cli_alert_success("Task returned expected result")
    }
}

R and base plan with {future}

Base R and plan(sequential)

With R base functions :

tic("Base R")
base_r_result <- unlist(lapply(list_list_sums, function(l_sums) {
    v_sums <- unlist(
        lapply(l_sums, slow_sum, delay = delay, message = FALSE)
    )
    sum(v_sums)
}))
base_r_time <- toc()
Base R: 20.076 sec elapsed
check_result(task_time = base_r_time, base_r_result)
✔ Task returned expected result

With our delay values we have the following :

  • \(10 \times 0.5s\) for each sub-sum.
  • \(2 \times 10 \times 0.5s\) for each sub-list of sub-sum

So \(2 \times 2 \times 10 \times 0.5 s\) for the list of sub-lists of sub-sums, so \(20 s\). Which is what we approximately observe with {tictoc}:
\(20.076 s\).

Avec le plan sequential

plan(sequential)
tic("Future plan(sequential)")
sequential_result <- future_nested_loop()
sequential_time <- toc()
Future plan(sequential): 20.145 sec elapsed
check_result(
    task_time = sequential_time,
    task_result = sequential_result
)

With sequential plan it takes \(20.145\) s, which is comparable withR base functions.

plan(multisession)

plan(tweak("multisession", workers = 2L))
tic("Future plan(tweak('multisession', workers = 2L))")
multisession2_result <- future_nested_loop()
multisession2_time <- toc()
Future plan(tweak('multisession', workers = 2L)): 10.365 sec elapsed
check_result(
    task_time = multisession2_time,
    task_result = multisession2_result
)
✔ Task returned expected result

By allocating 2 workers with multisession we manage to obtain a duration below sequential: \(10.365\) s.

plan(multisession(workers = 4L))
tic("Future plan(multisession(workers = 4L))")
multisession4_result <- future_nested_loop()
multisession4_time <- toc()
Future plan(multisession(workers = 4L)): 10.404 sec elapsed
check_result(
    task_time = multisession4_time,
    task_result = multisession4_result
)
✔ Task returned expected result

By allocating 4 workers, we observe no improvement, it takes: \(10.404\) s.

This is because {future} doesn’t leverage nested parallelization by default, the first future_lapply being on a two elements list, the maximal reduction time is obtained with two workers. The other two allocated in this example are not used.

Nested parallelization

According to the documentation we can specify not only a single plan but a list of plans, that can apply to nested futures.

plan(list(multisession, multisession))
tic("Future plan(list(multisession, multisession))")
list_multisession_result <- future_nested_loop()
list_multisession_time <- toc()
Future plan(list(multisession, multisession)): 10.651 sec elapsed
check_result(
    task_time = list_multisession_time,
    task_result = list_multisession_result
)
✔ Task returned expected result

But at the opposite of what we expected, we do not lower the time taken but it increases ! The task takes: \(10.651\) s.

Note

As Henrik Bengtsson explains in the vignette Future Topologies, {future} has a protection against recursive parallelism. And thus the second loop is not parallelized but forced to be sequential.

The plan must thus be specified in a specific manner to work as intended.

Plan lists with explicit parameters

Given the structure of our task we want the first loop to run on the 2 elements of the list in parallel, thus we need 2 workers for the first future_lapply.

Each sub-list containing 2 elements, we want to allocate each 2 workers.

As we want to parallelize the whole task we need: \(2 \times 2 = 4\) workers to allocate.

plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = 2L)))
tic("Future plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = 2L)))")
list_multisession2_2_result <- rlang::try_fetch(
    future_nested_loop(),
    error = function(cnd) inform("Task failed.", parent = cnd)
)
Warning in checkNumberOfLocalWorkers(workers): Careful, you are setting up 2
localhost parallel workers with only 1 CPU cores available for this R process
(per ‘mc.cores’), which could result in a 200% load. The soft limit is set to
100%. Overusing the CPUs has negative impact on the current R process, but also
on all other processes of yours and others running on the same machine. See
help("parallelly.options", package = "parallelly") for how to override the soft
and hard limits
Warning in checkNumberOfLocalWorkers(workers): Careful, you are setting up 2
localhost parallel workers with only 1 CPU cores available for this R process
(per ‘mc.cores’), which could result in a 200% load. The soft limit is set to
100%. Overusing the CPUs has negative impact on the current R process, but also
on all other processes of yours and others running on the same machine. See
help("parallelly.options", package = "parallelly") for how to override the soft
and hard limits
list_multisession2_2_time <- toc()
Future plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = 2L))): 6.349 sec elapsed
check_result(
    task_time = list_multisession2_2_time,
    task_result = list_multisession2_2_result
)
✔ Task returned expected result

With this plan we get some warnings, explaining the we are over-parallelizing. We get this message even if we have the correct number of cores (or workers) to properly run this task.

This is {future} defense mecanism against recursive parallelism.

Why shouldn’t I ignore this warning

{future} has a soft limit (that gives warnings) and a hard limit, when reaching the hard limit (of 300% load per core) the future won’t run, even if there is enough available cores.

The next callout block and next code block explains how to tell {future} that this plan is ok.

Let {future} know that we want recursive parallelism

To tell the package that we know what we are doing, we must use I() which gives AsIs class to the object it wraps.

plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = I(2L))))
tic("Future plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = I(2L))))")
list_multisession2_I2_result <- rlang::try_fetch(
    future_nested_loop(),
    error = function(cnd) inform("Task failed.", parent = cnd)
)
list_multisession2_I2_time <- toc()
Future plan(list(tweak(multisession, workers = 2L), tweak(multisession, workers = I(2L)))): 6.396 sec elapsed
check_result(
    task_time = list_multisession2_I2_time,
    task_result = list_multisession2_I2_result
)
✔ Task returned expected result

And this way we can specify a working nested parallelization plan.

With {future.callr}

The {future.callr} package solves some limitations of future::multisession() and do not need the use of I() as shown in the below example:

plan(list(tweak(callr, workers = 2L), tweak(callr, workers = 2L)))
tic("Future `plan(list(tweak(callr, workers = 2L), tweak(callr, workers = 2L)))`")
list_callr2_2_result <- rlang::try_fetch(
    future_nested_loop(),
    error = function(cnd) inform("Task failed.", parent = cnd)
)
list_callr2_2_time <- toc()
Future `plan(list(tweak(callr, workers = 2L), tweak(callr, workers = 2L)))`: 6.299 sec elapsed
check_result(
    task_time = list_callr2_2_time,
    task_result = list_callr2_2_result
)
✔ Task returned expected result

Execution time summary table

future::plan(.) Elapsed time
Base R 20.076
sequential 20.145
multisession(workers = 2L) 10.365
multisession(workers = 4L) 10.404
list(multisession, multisession) 10.651
list(tweak(multisession, workers = 2L), tweak(multisession, workers = 2L)) 6.349
list(tweak(multisession, workers = 2L), tweak(multisession, workers = I(2L))) 6.396
list(tweak(callr, workers = 2L), tweak(callr, workers = 2L)) 6.299